<div style="text-align: center; line-height: 0; padding-top: 9px;">
  <img src="../../resources/logo.png" alt="Intellinum Bootcamp" style="width: 600px; height: 163px">
</div>

# Corrupt Record Handling

Apache Spark&trade; provide ways to handle corrupt records.

## In this lesson you:
* Define corruption logic to handle corrupt records
* Pipe corrupt records into a directory for later analysis

## Working with Corrupt Data

ETL pipelines need robust solutions to handle corrupt data. This is because data corruption scales as the size of data and complexity of the data application grow. Corrupt data includes:  
<br>
* Missing information
* Incomplete information
* Schema mismatch
* Differing formats or data types
* User errors when writing data producers

Since ETL pipelines are built to be automated, production-oriented solutions must ensure pipelines behave as expected. This means that **data engineers must both expect and systematically handle corrupt records.**

In the road map for ETL, this is the **Handle Corrupt Records** step:
<img src="../../resources/ETL-Process-3.png" style="border: 1px solid #aaa; border-radius: 10px 10px 10px 10px; box-shadow: 5px 5px 5px #aaa"/>

In [1]:
#MODE = "LOCAL"
MODE = "CLUSTER"

import sys
from pyspark.sql import SparkSession
from pyspark import SparkConf
import os
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark import SparkConf
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.storagelevel import StorageLevel
from matplotlib import interactive
interactive(True)
import matplotlib.pyplot as plt
%matplotlib inline
import json
import math
import numbers
import numpy as np
import plotly
plotly.offline.init_notebook_mode(connected=True)

sys.path.insert(0,'../../src')
from settings import *

try:
    fh = open('../../libs/pyspark24_py36.zip', 'r')
except FileNotFoundError:
    !aws s3 cp s3://yuan.intellinum.co/bins/pyspark24_py36.zip ../../libs/pyspark24_py36.zip

try:
    spark.stop()
    print("Stopped a SparkSession")
except Exception as e:
    print("No existing SparkSession detected")
    print("Creating a new SparkSession")

SPARK_DRIVER_MEMORY= "1G"
SPARK_DRIVER_CORE = "1"
SPARK_EXECUTOR_MEMORY= "1G"
SPARK_EXECUTOR_CORE = "1"
SPARK_EXECUTOR_INSTANCES = 12



conf = None
if MODE == "LOCAL":
    os.environ["PYSPARK_PYTHON"] = "/home/yuan/anaconda3/envs/pyspark24_py36/bin/python"
    conf = SparkConf().\
            setAppName("pyspark_etl_03-corrupt-record-handling").\
            setMaster('local[*]').\
            set('spark.driver.maxResultSize', '0').\
            set('spark.jars', '../../libs/mysql-connector-java-5.1.45-bin.jar').\
            set('spark.jars.packages','net.java.dev.jets3t:jets3t:0.9.0,com.google.guava:guava:16.0.1,com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.1')
else:
    os.environ["PYSPARK_PYTHON"] = "./MN/pyspark24_py36/bin/python"
    conf = SparkConf().\
            setAppName("pyspark_etl_03-corrupt-record-handling").\
            setMaster('yarn-client').\
            set('spark.executor.cores', SPARK_EXECUTOR_CORE).\
            set('spark.executor.memory', SPARK_EXECUTOR_MEMORY).\
            set('spark.driver.cores', SPARK_DRIVER_CORE).\
            set('spark.driver.memory', SPARK_DRIVER_MEMORY).\
            set("spark.executor.instances", SPARK_EXECUTOR_INSTANCES).\
            set('spark.sql.files.ignoreCorruptFiles', 'true').\
            set('spark.yarn.dist.archives', '../../libs/pyspark24_py36.zip#MN').\
            set('spark.sql.shuffle.partitions', '5000').\
            set('spark.default.parallelism', '5000').\
            set('spark.driver.maxResultSize', '0').\
            set('spark.jars.packages','net.java.dev.jets3t:jets3t:0.9.0,com.google.guava:guava:16.0.1,com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.1'). \
            set('spark.driver.maxResultSize', '0').\
            set('spark.jars', 's3://yuan.intellinum.co/bins/mysql-connector-java-5.1.45-bin.jar')
        

spark = SparkSession.builder.\
    config(conf=conf).\
    getOrCreate()


sc = spark.sparkContext

sc.addPyFile('../../src/settings.py')

sc=spark.sparkContext
hadoop_conf = sc._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
hadoop_conf.set("mapreduce.fileoutputcommitter.algorithm.version", "2")

def display(df, limit=10):
    return df.limit(limit).toPandas()

def dfTest(id, expected, result):
    assert str(expected) == str(result), "{} does not equal expected {}".format(result, expected)

No existing SparkSession detected
Creating a new SparkSession


Run the following cell, which contains a corrupt record, `{"a": 1, "b, "c":10}`:

This is not the preferred way to make a DataFrame.  This code allows us to mimic a corrupt record you might see in production.

In [2]:
data = """{"a": 1, "b":2, "c":3}|{"a": 1, "b":2, "c":3}|{"a": 1, "b, "c":10}""".split('|')

corruptDF = (spark.read
  .option("mode", "PERMISSIVE")
  .option("columnNameOfCorruptRecord", "_corrupt_record")
  .json(sc.parallelize(data))
)

display(corruptDF)

Unnamed: 0,_corrupt_record,a,b,c
0,,1.0,2.0,3.0
1,,1.0,2.0,3.0
2,"{""a"": 1, ""b, ""c"":10}",,,


In the previous results, Spark parsed the corrupt record into its own column and processed the other records as expected. This is the default behavior for corrupt records, so you didn't technically need to use the two options `mode` and `columnNameOfCorruptRecord`.

There are three different options for handling corrupt records [set through the `ParseMode` option](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ParseMode.scala#L34):

| `ParseMode` | Behavior |
|-------------|----------|
| `PERMISSIVE` | Includes corrupt records in a "_corrupt_record" column (by default) |
| `DROPMALFORMED` | Ignores all corrupted records |
| `FAILFAST` | Throws an exception when it meets corrupted records |

The following cell acts on the same data but drops corrupt records:

In [3]:
data = """{"a": 1, "b":2, "c":3}|{"a": 1, "b":2, "c":3}|{"a": 1, "b, "c":10}""".split('|')

corruptDF = (spark.read
  .option("mode", "DROPMALFORMED")
  .json(sc.parallelize(data))
)
display(corruptDF)

Unnamed: 0,a,b,c
0,1,2,3
1,1,2,3


The following cell throws an error once a corrupt record is found, rather than ignoring or saving the corrupt records:

In [4]:
try:
    data = """{"a": 1, "b":2, "c":3}|{"a": 1, "b":2, "c":3}|{"a": 1, "b, "c":10}""".split('|')

    corruptDF = (spark.read
      .option("mode", "FAILFAST")
      .json(sc.parallelize(data))
    )
    display(corruptDF)
    
except Exception as e:
    print(e)

An error occurred while calling o191.json.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 4999 in stage 16.0 failed 4 times, most recent failure: Lost task 4999.3 in stage 16.0 (TID 25002, ip-172-31-47-2.us-east-2.compute.internal, executor 7): org.apache.spark.SparkException: Malformed records are detected in schema inference. Parse Mode: FAILFAST.
	at org.apache.spark.sql.catalyst.json.JsonInferSchema$$anonfun$1$$anonfun$apply$1.apply(JsonInferSchema.scala:66)
	at org.apache.spark.sql.catalyst.json.JsonInferSchema$$anonfun$1$$anonfun$apply$1.apply(JsonInferSchema.scala:53)
	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
	at scala.collection.Iterator$class.isEmpty(Iterator.scala:331)
	at scala.collection.AbstractIterator.isEmpty(Iterator.scala:1334)
	at scala.collection.TraversableOnce$class.reduceLeftOption(TraversableOnce.scala:203)
	at scala.collection.AbstractIterator.r

### Recommended Pattern: `badRecordsPath`

Databricks Runtime has [a built-in feature](https://docs.databricks.com/spark/latest/spark-sql/handling-bad-records.html) that saves corrupt records to a given end point. To use this, set the `badRecordsPath`.

This is a preferred design pattern since it persists the corrupt records for later analysis even after the cluster shuts down.

In [5]:
#TODO
YOUR_FIRST_NAME = "rajeev"

In [6]:
basePath = "s3://temp.intellinum.co/{}/etl1p".format(YOUR_FIRST_NAME)
myBadRecords = "{}/badRecordsPath".format(basePath)

print("""Your temp directory is "{}" """.format(myBadRecords))

Your temp directory is "s3://temp.intellinum.co/rajeev/etl1p/badRecordsPath" 


In [7]:
!aws s3 ls temp.intellinum.co/rajeev/

                           PRE ipCount.parquet/
                           PRE serverErrorDF.parquet/


In [8]:
data = """{"a": 1, "b":2, "c":3}|{"a": 1, "b":2, "c":3}|{"a": 1, "b, "c":10}""".split('|')

corruptDF = (spark.read
  .option("badRecordsPath", myBadRecords)
  .json(sc.parallelize(data))
)
display(corruptDF)

Unnamed: 0,_corrupt_record,a,b,c
0,,1.0,2.0,3.0
1,,1.0,2.0,3.0
2,"{""a"": 1, ""b, ""c"":10}",,,


See the results in the path specified by `myBadRecords`.

Recall that this directory is backed by S3 and is available to all clusters.

Note: **Only Databricks runtime 3.0 supports badRecordsPath for now**

In [9]:
path = "{}/*/*/*".format(myBadRecords)
display(spark.read.json(path))

AnalysisException: 'Path does not exist: s3://temp.intellinum.co/rajeev/etl1p/badRecordsPath/*/*/*;'

## Exercise 1: Working with Corrupt Records

### Step 1: Diagnose the Problem

Import the data used in the last lesson, which is located at `s3://data.intellinum.co/bootcamp/common/UbiqLog4UCI/14_F/log*`.  Import the corrupt records in a new column `SMSCorrupt`.  <br>

Save only the columns `SMS` and `SMSCorrupt` to the new DataFrame `SMSCorruptDF`.

In [54]:
# TODO
from pyspark.sql.types import StructType, StructField, StringType

path = "s3://data.intellinum.co/bootcamp/common/UbiqLog4UCI/14_F/log*"
schema = StructType([
    StructField('SMS', StringType(), True),
    StructField('_corrupt_record', StringType(), True)
])
SMSDF = spark.read.schema(schema).json(path).filter('_corrupt_record != "None" ')

In [55]:
display(SMSDF)

Unnamed: 0,SMS,_corrupt_record
0,,"{""SMS"":{""Address"":""+98912503####"",""type"":""1"",""..."
1,,"{""SMS"":{""Address"":""+98912503####"",""type"":""1"",""..."
2,,"{""SMS"":{""Address"":""+98912503####"",""type"":""1"",""..."
3,,"{""SMS"":{""Address"":""+98912503####"",""type"":""1"",""..."
4,,"{""SMS"":{""Address"":""+98912503####"",""type"":""1"",""..."
5,,"{""SMS"":{""Address"":""+98912503####"",""type"":""1"",""..."
6,,"{""SMS"":{""Address"":""+98912503####"",""type"":""1"",""..."
7,,"{""SMS"":{""Address"":""+98912503####"",""type"":""1"",""..."


In [56]:
SMSCorruptDF = SMSDF.select(F.col('SMS'), F.col('_corrupt_record').alias('SMSCorrupt'))
SMSCorruptDF.printSchema()

root
 |-- SMS: string (nullable = true)
 |-- SMSCorrupt: string (nullable = true)



In [57]:
# TEST - Run this cell to test your solution
cols = set(SMSCorruptDF.columns)
SMSCount = SMSCorruptDF.cache().count()

dfTest("ET1-P-06-01-01", True, "SMS" in cols)
dfTest("ET1-P-06-01-02", True, "SMSCorrupt" in cols)
dfTest("ET1-P-06-01-03", 8, SMSCount)

print("Tests passed!")

Tests passed!


Examine the corrupt records to determine what the problem is with the bad records.

**Hint:** Take a look at the name in metadata.

The entry `{"name": "mr Khojasteh"flash""}` should have single quotes around `flash` since the double quotes are interpreted as the end of the value.  It should read `{"name": "mr Khojasteh'flash'"}` instead.

The optimal solution is to fix the initial producer of the data to correct the problem at its source.  In the meantime, you could write ad hoc logic to turn this into a readable field.

### Step 2: Use `badRecordsPath`

Use the `badRecordsPath` option to save corrupt records to the directory specified by the `corruptPath` variable below.

In [73]:
# TODO
data = SMSCorruptDF.select(F.col('SMSCorrupt'))
SMSCorruptDF2 = spark.read.option('badRecordsPath', myBadRecords).json(sc.parallelize(data))

Py4JError: An error occurred while calling o1514.__getstate__. Trace:
py4j.Py4JException: Method __getstate__([]) does not exist
	at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
	at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
	at py4j.Gateway.invoke(Gateway.java:274)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)



In [3]:
# TEST - Run this cell to test your solution
SMSCorruptDF2.count()

testPath = "{}/corruptSMS/*/*/*".format(basePath)
corruptCount = spark.read.json(testPath).count()

dfTest("ET1-P-06-02-01", True, corruptCount >= 8)

print("Tests passed!")

NameError: name 'SMSCorruptDF2' is not defined

One last step... let's clean up our temp files:

In [2]:
basePath

NameError: name 'basePath' is not defined

In [1]:
!aws s3 rm --recursive {basePath}


usage: aws s3 rm <S3Uri>
Error: Invalid argument type


## Review
**Question:** By default, how are corrupt records dealt with using `spark.read.json()`?  
**Answer:** They appear in a column called `_corrupt_record`.

**Question:** How can a query persist corrupt records in separate destination?  
**Answer:** The Databricks runtime 3.0 supports a feature called `badRecordsPath` that allows a query to save corrupt records to a given end point for the pipeline engineer to investigate corruption issues. But this feature is not available in Opensourced version of the Spark yet.

&copy; 2019 [Intellinum Analytics, Inc](http://www.intellinum.co). All rights reserved.<br/>