# ETL with Spark

This notebook serves as a testing ground for the Spark ETL job for the script `src/data/etl_spark.py`.

In [1]:
## IMPORTS
# PYSPARK
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import to_timestamp, countDistinct, to_date

# MISCELLANEOUS
from sys import path # import helper_functions.py
path.insert(1, '../src/data') # insert at 1, 0 is the script path (or '' in REPL)
import helper_functions as h

## Explore core specifications

In [2]:
# create session
spark = SparkSession\
    .builder \
    .appName("ETL for esg") \
    .getOrCreate()

print("Spark context:\n")
print(*spark.sparkContext.getConf().getAll(), sep='\n')

# read in first gtrends files
# preprocessed query output
path_input = h.get_files('../data/raw', name_contains="*gtrends*preprocessed*")[0]
df_raw = spark.read.csv(path_input, header=True)
# query input
path_input = h.get_files('../data/raw', name_contains="*gtrends*meta*")[0]
df_raw_meta = spark.read.csv(path_input, header=True)

print('-'*40)
print(f"Load file:{path_input}")


# print description
print('-'*40)
print(df_raw.printSchema())
print('-'*40)
df_nrows = df_raw.count()
df_ncols = len(df_raw.columns)
print("Rows={}, Columns={}".format(df_nrows, df_ncols))
print('-'*40)
print("METADATA")
print(df_raw_meta.printSchema())
print('-'*40)
df_meta_nrows = df_raw_meta.count()
df_meta_ncols = len(df_raw_meta.columns)
print("Rows={}, Columns={}".format(df_meta_nrows, df_meta_ncols))

Spark context:

('spark.driver.host', 'host.docker.internal')
('spark.app.id', 'local-1604562321954')
('spark.rdd.compress', 'True')
('spark.app.name', 'ETL for esg')
('spark.serializer.objectStreamReset', '100')
('spark.master', 'local[*]')
('spark.submit.pyFiles', '')
('spark.executor.id', 'driver')
('spark.submit.deployMode', 'client')
('spark.driver.port', '57250')
('spark.ui.showConsoleProgress', 'true')
----------------------------------------
Load file:C:\Users\Philipp\GDrive\Projekter\Courses\Udacity Data Engineer\5 Capstone Project\data_engineer_capstone\data\raw\20201017-191627gtrends_metadata.csv
----------------------------------------
root
 |-- date: string (nullable = true)
 |-- keyword: string (nullable = true)
 |-- search_interest: string (nullable = true)

None
----------------------------------------
Rows=998325, Columns=3
----------------------------------------
METADATA
root
 |-- topic: string (nullable = true)
 |-- positive: string (nullable = true)
 |-- date_defin

In [3]:
spark

# Preprocessing

1. rename columns
2. set column types (date, string, int)

## Gtrends

In [4]:
# rename columns
df_renamed = df_raw.withColumnRenamed("_c0","date")\
    .withColumnRenamed("_c1","keyword")\
    .withColumnRenamed("_c2","search_interest")

# cast column types
df = df_renamed.withColumn("search_interest", df_renamed["search_interest"].cast(IntegerType()))\
    .withColumn('date', to_date(df_renamed.date))

## Metadata (API input)

1. convert date strings to date type

In [5]:
# change date string to date type
df_meta = df_raw_meta.withColumn('date_define_topic', to_date(df_raw_meta.date_define_topic))\
    .withColumn('date_get_firmname', to_date(df_raw_meta.date_get_firmname))\
    .withColumn('date_construct_keyword', to_date(df_raw_meta.date_construct_keyword))\
    .withColumn('date_query_googletrends', to_date(df_raw_meta.date_query_googletrends))

# Validation

Validation is done with `Spark SQL` due to learning purposes.

To validate the dataset, the input data from `metadata` can be compared to the API output `gtrends`. 



In [6]:
# create temporary view for SQL
df.createOrReplaceTempView("df")
df_meta.createOrReplaceTempView("meta")

#### Distinct counts of keyword and date

In [7]:
# count distinct keywords, dates and 
kw_count = spark.sql("""
    SELECT COUNT(DISTINCT keyword), COUNT(DISTINCT date)
    FROM df
    """).collect()

distinct_kw_date = [i for i in kw_count[0]]
print("\tDISTINCT \nkeywords \tdates \t=dates*keywords")
print("-"*40)
print("{}\t\t {}\t {}".format(distinct_kw_date[0], distinct_kw_date[1], distinct_kw_date[0]*distinct_kw_date[1]))

	DISTINCT 
keywords 	dates 	=dates*keywords
----------------------------------------
3825		 261	 998325


#### Set difference Input//Output

Store delta between input and output as `failed_queries`

In [99]:
# get set difference of meta (input)/df (output)
set_difference_keywords = spark.sql("""
    SELECT DISTINCT in.keyword
    FROM meta AS in
    WHERE in.keyword NOT IN (
        SELECT DISTINCT out.keyword 
        FROM df AS out)
""")

In [120]:
# get date range of queries and add to metadata
# most recent date record
date_first = spark.sql("""
    SELECT DISTINCT date
    FROM df
    ORDER BY date DESC
    LIMIT 1
""").collect ()

# first date record
date_last = spark.sql("""
    SELECT DISTINCT date
    FROM df
    ORDER BY date ASC
    LIMIT 1
""").collect()

print("First (earliest) date record: {}".format(date_first[0].date))
print("Last (latest) date record: {}".format(date_last[0].date))

First (earliest) date record: 2020-10-11
Last (latest) date record: 2015-10-18


# Export in parquet to S3

The previous steps explored and validated the query input and output data. We arrive at the following datasets which are exported in parquet format to AWS S3.  

1. Query output: Google Trends search interest
2. Query input: Google Trends meta data
3. Query failure: Not every query resulted in  Difference between (1) query input and (2) query output

In [121]:
print(set_difference_keywords.show())

+--------------------+
|             keyword|
+--------------------+
|hate Alexion Phar...|
|issue American In...|
|problem Alexion P...|
|scam American Int...|
|bad Alexion Pharm...|
|strike American I...|
|controversial Ame...|
|lawsuit Alexion P...|
|trouble American ...|
|unfair Alexion Ph...|
+--------------------+

None


In [119]:
spark.conf.set("spark.sql.parquet.compression.codec", "snappy")
df_meta.write.save("./query_metadata.parquet", format="parquet", mode='overwrite')
set_difference_keywords.write.save("./query_metadata.parquet", format="parquet", mode='overwrite')
df.write.parquet("s3://esg-analytics/processed_gtrends.parquet", mode='overwrite')

Py4JJavaError: An error occurred while calling o444.save.
: org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:226)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:178)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:131)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:122)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:121)
	at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:963)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:963)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:415)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:399)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:288)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 112.0 failed 1 times, most recent failure: Lost task 0.0 in stage 112.0 (TID 3601, host.docker.internal, executor driver): java.io.IOException: (null) entry in command string: null chmod 0644 C:\Users\Philipp\GDrive\Projekter\Courses\Udacity Data Engineer\5 Capstone Project\data_engineer_capstone\notebooks\query_metadata.parquet\_temporary\0\_temporary\attempt_20201105145621_0112_m_000000_3601\part-00000-0f4cea1c-1fb7-494d-a285-c0eb2f550bfb-c000.snappy.parquet
	at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:773)
	at org.apache.hadoop.util.Shell.execCommand(Shell.java:869)
	at org.apache.hadoop.util.Shell.execCommand(Shell.java:852)
	at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:733)
	at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:225)
	at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:209)
	at org.apache.hadoop.fs.RawLocalFileSystem.createOutputStreamWithMode(RawLocalFileSystem.java:307)
	at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:296)
	at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:328)
	at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.<init>(ChecksumFileSystem.java:398)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:461)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:440)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:911)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:892)
	at org.apache.parquet.hadoop.util.HadoopOutputFile.create(HadoopOutputFile.java:74)
	at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:248)
	at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:390)
	at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:349)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.<init>(ParquetOutputWriter.scala:37)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.newInstance(ParquetFileFormat.scala:150)
	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:126)
	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.<init>(FileFormatDataWriter.scala:111)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:264)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:205)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:195)
	... 32 more
Caused by: java.io.IOException: (null) entry in command string: null chmod 0644 C:\Users\Philipp\GDrive\Projekter\Courses\Udacity Data Engineer\5 Capstone Project\data_engineer_capstone\notebooks\query_metadata.parquet\_temporary\0\_temporary\attempt_20201105145621_0112_m_000000_3601\part-00000-0f4cea1c-1fb7-494d-a285-c0eb2f550bfb-c000.snappy.parquet
	at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:773)
	at org.apache.hadoop.util.Shell.execCommand(Shell.java:869)
	at org.apache.hadoop.util.Shell.execCommand(Shell.java:852)
	at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:733)
	at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:225)
	at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:209)
	at org.apache.hadoop.fs.RawLocalFileSystem.createOutputStreamWithMode(RawLocalFileSystem.java:307)
	at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:296)
	at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:328)
	at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.<init>(ChecksumFileSystem.java:398)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:461)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:440)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:911)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:892)
	at org.apache.parquet.hadoop.util.HadoopOutputFile.create(HadoopOutputFile.java:74)
	at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:248)
	at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:390)
	at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:349)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.<init>(ParquetOutputWriter.scala:37)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.newInstance(ParquetFileFormat.scala:150)
	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:126)
	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.<init>(FileFormatDataWriter.scala:111)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:264)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:205)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	... 1 more


In [None]:
# store in parquet
# df.write.save("namesAndAges.parquet", format="parquet")

df.write.parquet("output/proto.parquet")
df.write.parquet("s3://sparkbyexamples/parquet/people.parquet")
# TODO: Store on S3

# Spark Script Example: Random text classification

In [None]:
# pyspark
import argparse

from pyspark.sql import SparkSession
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from pyspark.sql.functions import array_contains


def random_text_classifier(input_loc, output_loc):
    """
    This is a dummy function to show how to use spark, It is supposed to mock
    the following steps
        1. clean input data
        2. use a pre-trained model to make prediction 
        3. write predictions to a HDFS output

    Since this is meant as an example, we are going to skip building a model,
    instead we are naively going to mark reviews having the text "good" as positive and
    the rest as negative 
    """

    # read input
    df_raw = spark.read.option("header", True).csv(input_loc)
    # perform text cleaning

    # Tokenize text
    tokenizer = Tokenizer(inputCol="review_str", outputCol="review_token")
    df_tokens = tokenizer.transform(df_raw).select("cid", "review_token")

    # Remove stop words
    remover = StopWordsRemover(inputCol="review_token", outputCol="review_clean")
    df_clean = remover.transform(df_tokens).select("cid", "review_clean")

    # function to check presence of good
    df_out = df_clean.select(
        "cid", array_contains(df_clean.review_clean, "good").alias("positive_review")
    )
    # parquet is a popular column storage format, we use it here
    df_out.write.mode("overwrite").parquet(output_loc)


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--input", type=str, help="HDFS input", default="/movie")
    parser.add_argument("--output", type=str, help="HDFS output", default="/output")
    args = parser.parse_args()
    spark = SparkSession.builder.appName("Random Text Classifier").getOrCreate()
    random_text_classifier(input_loc=args.input, output_loc=args.output)
