# Data Engineer Take Home Exercise

## Question
### Data Prep

Write a script to transform input CSV to desired output CSV and Parquet. 

You will find a CSV file in the files folder under `data.csv`. There are three steps to this part of the test. Each step concerns manipulating the values for a single field according to the step's requirements. The steps are as follows:

**String cleaning** - The bio field contains text with arbitrary padding, spacing and line breaks. Normalize these values to a space-delimited string.

**Code swap** - There is a supplementary CSV in the files folder under `state_abbreviations`. This "data dictionary" contains state abbreviations alongside state names. For the state field of the input CSV, replace each state abbreviation with its associated state name from the data dictionary.

**Date offset** - The start_date field contains data in a variety of formats. These may include e.g., "June 23, 1912" or "5/11/1930" (month, day, year). But not all values are valid dates. Invalid dates may include e.g., "June 2018", "3/06" (incomplete dates) or even arbitrary natural language. Add a start_date_description field adjacent to the start_date column to filter invalid date values into. Normalize all valid date values in start_date to ISO 8601 (i.e., YYYY-MM-DD).

Your script should take `data.csv` as input and produce a cleansed `enriched.csv` and `enriched.snappy.parquet` files according to the step requirements above.

## Submission Guidelines
We ask that your solutions be implemented in Python (3.8 or newer) or PySpark (3.3 or newer). If you would like to present skills for both approach, feel free to prepare two separate jupyter notebooks. Assume that code will be used monthly to process the data and store it in AWS S3 based data lake. With that assumption please prepare for discussion how this code can be scheduled and how outputs should be stored in S3 bucket.

### Assessment Criteria
Our goal is not to fool you. On the contrary, we would like to see you in your best light! We value clean, DRY and documented code; and in the interest of full disclosure, our assessment criteria is outlined below (in order of significance):

1. Your ability to effectively solve the problems posed.
2. Your ability to solve these problems in a clear and logical manner, with tasteful design.
3. Your ability to appropriately document and comment your code.




# Project submission - Spark challenge

Author: Jakub Pitera

This notebook showcases how author created the ETL pipeline according to submission requirements. 
Last cell contains alternative code with suggestions on how to update code to load data into AWS S3 bucket

Reformatted version, stripped down, clean and utilizing functions, has been assembled in *'ETL.py'* which could be used to run such pipeline from the console.

Files in the main directory:
- **takehomefile.ipynb** - Main Jupyter notebook with assignment details. It showcases how author created the ETL pipeline with Spark according to the data preperation requirements. 
- **ETL.py** - ETL code reformatted into functions. Can be initialized in terminal.
- **ETL_S3.py** - Alternate version of ETL.py. ETL steps were updated to load data to AWS S3 bucket.
- **dl.cfg** - Template for storing AWS credentials used when writing to S3.
- **airflow_dag_naive.py** - using airflow to run ETL tasks. naive approach. demonstrates usage of airflow 
- **airflow_dag_sparky.py** - using airflow to run spark application. more elegant approach. data is loaded to S3
- **takehomefile_pandas.ipynb** - Alternative Jupyter Notebook with ETL converted to Pandas
- **requirements.txt** - List of required packages
- **data.csv** - raw dataset in CSV format.
- **state_abbreviations.csv** - abbreviations data dictionary in CSV format.
- **enriched.csv** - directory storing final enriched ouput in CSV format
- **enriched.snappy.parquet** - directory storing final enriched output in SNAPPY.PARQUET format

## Initialization

In [1]:
# Import libraries
import pandas as pd

from pyspark.sql import SparkSession
from pyspark.sql.functions import isnan, when, count, col, regexp_replace, trim, to_date, when, coalesce, length

In [2]:
# Create a SparkSession object running locally
spark = SparkSession.builder \
    .appName("InterviewChallenge") \
    .master("local[2]") \
    .config("spark.executor.memory", "2g") \
    .getOrCreate()

## Load dataset

In [3]:
# Read input data to spark dataframe
filepath = 'data.csv'

df = spark.read.option('header', True).csv(filepath)

# Create copy of raw data for reference
df_raw = df.alias('df_raw')

In [4]:
# Inspect if data has been loaded correctly
df.show(10)

+--------------------+--------------------+----------------+--------------------+-----------------+-----+-------+--------------------+--------------------+--------------------+--------------------+
|                name|              gender|       birthdate|             address|             city|state|zipcode|               email|                 bio|                 job|          start_date|
+--------------------+--------------------+----------------+--------------------+-----------------+-----+-------+--------------------+--------------------+--------------------+--------------------+
|       Leslee Corwin|                   M|      1974-02-01|    4933 Weber Walks|       Lake Carey|   KS|  32725|hansen.kennedy@ya...|At aut velit unde...|Education adminis...|               10/06|
|       Orris Kuvalis|                   M|      1997-01-08|     092 Kanye Forge|South Doshiamouth|   TN|  08955|nicky.brown@yahoo...|Corporis non haru...|                null|                null|
|         

In [5]:
df.select('name', 'gender', 'birthdate').show(100)

+--------------------+--------------------+--------------------+
|                name|              gender|           birthdate|
+--------------------+--------------------+--------------------+
|       Leslee Corwin|                   M|          1974-02-01|
|       Orris Kuvalis|                   M|          1997-01-08|
|                   "|    Industrial buyer|    Voluptatem odio.|
|        Afton Hirthe|                   M|          1970-08-25|
|       Olinda Wisoky|                   F|          2007-01-11|
|Dr. Annmarie Schm...|                   M|          1995-09-29|
|        Mathew Grady|                   F|          1973-04-30|
| Dolor eum et eve...|Designer, multimedia|               10/95|
|    Ailene Abernathy|                   F|          2002-06-19|
| Et laboriosam qu...|                null|                null|
|,Personal assista...|                null|                null|
|      Justen Carroll|                   M|          2003-08-27|
| Veniam rerum vol...|   

Above two queries proove data was read incorrectly and rows are skewed. The text in 'bio' column containing multiple newlines was parsed incorrectly. Newlines were read as a start of new row 

In [6]:
# Reload dataframe using additional options
df = (spark.read
.option("multiline",True) # to read text containing newlines properly
.option("header", True)
.csv(filepath))

In [7]:
# Again inspect if data has been loaded correctly
df.show(10)

+--------------------+------+----------+--------------------+-----------------+-----+-------+--------------------+--------------------+--------------------+--------------------+
|                name|gender| birthdate|             address|             city|state|zipcode|               email|                 bio|                 job|          start_date|
+--------------------+------+----------+--------------------+-----------------+-----+-------+--------------------+--------------------+--------------------+--------------------+
|       Leslee Corwin|     M|1974-02-01|    4933 Weber Walks|       Lake Carey|   KS|  32725|hansen.kennedy@ya...|At aut velit unde...|Education adminis...|               10/06|
|       Orris Kuvalis|     M|1997-01-08|     092 Kanye Forge|South Doshiamouth|   TN|  08955|nicky.brown@yahoo...|Corporis non haru...|    Industrial buyer|    Voluptatem odio.|
|        Afton Hirthe|     M|1970-08-25|355 Shaquille Cen...|     Lorriborough|   OK|  12027|noelle.gibson@leb

In [8]:
df.select('name', 'gender', 'birthdate').show(100)

+--------------------+------+----------+
|                name|gender| birthdate|
+--------------------+------+----------+
|       Leslee Corwin|     M|1974-02-01|
|       Orris Kuvalis|     M|1997-01-08|
|        Afton Hirthe|     M|1970-08-25|
|       Olinda Wisoky|     F|2007-01-11|
|Dr. Annmarie Schm...|     M|1995-09-29|
|        Mathew Grady|     F|1973-04-30|
|    Ailene Abernathy|     F|2002-06-19|
|      Justen Carroll|     M|2003-08-27|
|       Nikita Torphy|     M|1979-08-02|
|   Murry Waelchi PhD|     F|2013-12-03|
|     Darrick Schultz|     F|1987-04-29|
|Mrs. Charlize Wil...|     F|1997-02-13|
|     Brett Hettinger|     F|1986-03-27|
|         Benito Wolf|     F|1990-05-15|
|        Aloys Parker|     M|1979-10-10|
|    Pallie Reilly MD|     M|2012-10-27|
|  Francisquita Boehm|     M|2004-12-05|
|     Colonel Johnson|     F|1987-05-11|
|       Marina Kemmer|     F|1979-06-14|
|  Miss Effie Hermann|     M|1982-02-09|
|      Wendi Schinner|     F|1977-12-02|
|     Dr. Liam F

Now all columns are parsed accurately.

## Inspecting dataset

In [9]:
# Schema
df.printSchema()

root
 |-- name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- birthdate: string (nullable = true)
 |-- address: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- zipcode: string (nullable = true)
 |-- email: string (nullable = true)
 |-- bio: string (nullable = true)
 |-- job: string (nullable = true)
 |-- start_date: string (nullable = true)



In [10]:
# DTypes in pd formatting
pd.DataFrame(df.dtypes, columns=['Column', 'Dtype'])

Unnamed: 0,Column,Dtype
0,name,string
1,gender,string
2,birthdate,string
3,address,string
4,city,string
5,state,string
6,zipcode,string
7,email,string
8,bio,string
9,job,string


In [11]:
# Column statistics
df.describe().toPandas()

Unnamed: 0,summary,name,gender,birthdate,address,city,state,zipcode,email,bio,job,start_date
0,count,500,500,500,500,500,500,500.0,500,500,500,500
1,mean,,,,,,,48102.664,,,,
2,stddev,,,,,,,28926.117960826883,,,,
3,min,Ace Terry V,F,1970-01-13,00967 Cesar Isle Apt. 737,Adisonburgh,AK,601.0,abagail.mclaughlin@hotmail.com,A et minima repudiandae. Reiciendis amet exped...,Academic librarian,01/01/1995
4,max,Zion Schuster,M,2015-02-25,999 Albertha Stream Apt. 938,Zoatown,WY,99877.0,zstokes@hotmail.com,Voluptatum tempora autem optio debitis velit a...,Wellsite geologist,Voluptatibus iste.


In [12]:
# Dataframe shape
print("Dataframe row count: ", df.count())
print("Dataframe columns count: ", len(df.columns))

Dataframe row count:  500
Dataframe columns count:  11


In [13]:
# Number of missing and values in df for each column
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+----+------+---------+-------+----+-----+-------+-----+---+---+----------+
|name|gender|birthdate|address|city|state|zipcode|email|bio|job|start_date|
+----+------+---------+-------+----+-----+-------+-----+---+---+----------+
|   0|     0|        0|      0|   0|    0|      0|    0|  0|  0|         0|
+----+------+---------+-------+----+-----+-------+-----+---+---+----------+



## Transformation

### String cleaning
The bio field contains text with arbitrary padding, spacing and line breaks. Normalize these values to a space-delimited string.

In [14]:
# Inspect 'bio' column
df.select('bio').show(100)

+--------------------+
|                 bio|
+--------------------+
|At aut velit unde...|
|Corporis non haru...|
|Sed vitae dolorem...|
|Nostrum impedit n...|
|Commodi quia face...|
|Non omnis fugit m...|
|Eius quas expedit...|
|Optio molestias a...|
|Recusandae quod s...|
|Perspiciatis aut ...|
|Ut voluptate mini...|
|Quis illo unde si...|
|Neque dolor inven...|
|Adipisci accusamu...|
|Et aliquid molest...|
|Odio numquam rati...|
|Inventore rerum m...|
|Officiis ex debit...|
|Laudantium ea aut...|
|Inventore porro r...|
|Iusto vel libero ...|
|Repudiandae aut d...|
|Voluptatem placea...|
|Voluptas et volup...|
|Vitae sint nisi a...|
|Et alias deserunt...|
|Labore qui autem ...|
|Modi est consequa...|
|Ratione sapiente ...|
|Corporis omnis of...|
|Beatae nam dolore...|
|Placeat quod atqu...|
|Et necessitatibus...|
|Ipsa sed dolorem ...|
|Iusto dolores qua...|
|Est asperiores es...|
|Porro debitis ill...|
|Ut laudantium vol...|
|Est non omnis qui...|
|Ducimus praesenti...|
|Nulla numq

In [15]:
# Print whole lines 
n = 10
dfpd = df.select('bio').limit(n).toPandas()

for i in range(0,n):
    print(i, dfpd.iloc[i,0])

0 At aut velit unde minus recusandae molestias. Est maxime labore nostrum.	 Vero debitis neque doloremque accusantium incidunt corporis et et.
1 Corporis non harum doloribus ab provident.	 Alias autem error id modi saepe. Ut delectus fugit dolores.
     
2 Sed vitae dolorem quae totam sequi fuga odit.	        Eaque alias quisquam blanditiis veniam. Aut perferendis sint deleniti accusamus quod.	
3 Nostrum impedit nulla vero ullam ad repudiandae. Excepturi praesentium tempore aspernatur ea est.	 Ipsa alias molestiae rerum omnis voluptates ut.	    
4 Commodi quia facere dolores facere. Sed culpa sit quo.	 Exercitationem error aut odio possimus.
5 Non omnis fugit molestias.
 Dolor eum et eveniet soluta eum. Placeat sapiente temporibus perspiciatis tempora quae quia.		
6 Eius quas expedita ut culpa doloribus.
 Et laboriosam quidem repellendus eveniet a. Nostrum soluta corporis doloremque sint est excepturi quisquam.	

7 Optio molestias accusamus quos aut beatae laudantium qui.	
 Veniam reru

In [16]:
# Normalize escape characters, padding and spacing with a single space
df = (df.withColumn('bio', regexp_replace(col('bio'), '\\[tbnrfsu]', ' '))
        .withColumn('bio', regexp_replace(col('bio'), '\s+', ' '))
        .withColumn('bio', trim(col('bio'))))

In [17]:
# Confirm the changes were applied
df.select('bio').show(100)

+--------------------+
|                 bio|
+--------------------+
|At aut velit unde...|
|Corporis non haru...|
|Sed vitae dolorem...|
|Nostrum impedit n...|
|Commodi quia face...|
|Non omnis fugit m...|
|Eius quas expedit...|
|Optio molestias a...|
|Recusandae quod s...|
|Perspiciatis aut ...|
|Ut voluptate mini...|
|Quis illo unde si...|
|Neque dolor inven...|
|Adipisci accusamu...|
|Et aliquid molest...|
|Odio numquam rati...|
|Inventore rerum m...|
|Officiis ex debit...|
|Laudantium ea aut...|
|Inventore porro r...|
|Iusto vel libero ...|
|Repudiandae aut d...|
|Voluptatem placea...|
|Voluptas et volup...|
|Vitae sint nisi a...|
|Et alias deserunt...|
|Labore qui autem ...|
|Modi est consequa...|
|Ratione sapiente ...|
|Corporis omnis of...|
|Beatae nam dolore...|
|Placeat quod atqu...|
|Et necessitatibus...|
|Ipsa sed dolorem ...|
|Iusto dolores qua...|
|Est asperiores es...|
|Porro debitis ill...|
|Ut laudantium vol...|
|Est non omnis qui...|
|Ducimus praesenti...|
|Nulla numq

In [18]:
# Print whole lines 
n = 10
dfpd = df.select('bio').limit(n).toPandas()

for i in range(0,n):
    print(i, dfpd.iloc[i,0])

0 At aut velit unde minus recusandae molestias. Est maxime labore nostrum. Vero debitis neque doloremque accusantium incidunt corporis et et.
1 Corporis non harum doloribus ab provident. Alias autem error id modi saepe. Ut delectus fugit dolores.
2 Sed vitae dolorem quae totam sequi fuga odit. Eaque alias quisquam blanditiis veniam. Aut perferendis sint deleniti accusamus quod.
3 Nostrum impedit nulla vero ullam ad repudiandae. Excepturi praesentium tempore aspernatur ea est. Ipsa alias molestiae rerum omnis voluptates ut.
4 Commodi quia facere dolores facere. Sed culpa sit quo. Exercitationem error aut odio possimus.
5 Non omnis fugit molestias. Dolor eum et eveniet soluta eum. Placeat sapiente temporibus perspiciatis tempora quae quia.
6 Eius quas expedita ut culpa doloribus. Et laboriosam quidem repellendus eveniet a. Nostrum soluta corporis doloremque sint est excepturi quisquam.
7 Optio molestias accusamus quos aut beatae laudantium qui. Veniam rerum voluptatibus beatae et facere 

### Code swap 
There is a supplementary CSV in the files folder under `state_abbreviations`. This "data dictionary" contains state abbreviations alongside state names. For the state field of the input CSV, replace each state abbreviation with its associated state name from the data dictionary.

In [19]:
# Option 1 - Creating state abbrevations dict using pandas
abbrv_dict = pd.read_csv('state_abbreviations.csv', index_col='state_abbr').to_dict()['state_name']

# Replace 'state' col using dictionary (uncomment line to run this option on dataframe)
# df = df.replace(abbrv_dict, subset='state')

In [20]:
# Option 2 - Creating state abbrevations dict using csv
import csv

with open('state_abbreviations.csv', 'r') as f:
    next(f) # skip header line
    abbrv_dict = {key: val for key,val in csv.reader(f)}

# Replace 'state' col using dictionary (uncomment line to run this option on dataframe)
# df = df.replace(abbrv_dict, subset='state')

In [21]:
# Option 3 - Using spark. Replacing values by joining a dataframe

# read lookup values into spark dataframe
abbr_filepath = 'state_abbreviations.csv'

abbr_df = spark.read.option('header', True).csv(abbr_filepath)

# Replace 'state' col using joined df with abbreviations
df = (df
    .join(abbr_df, df.state == abbr_df.state_abbr, how='left')
    .drop('state')
    .drop('state_abbr')
    .withColumnRenamed('state_name', 'state'))



In [22]:
# Confirming results by inspecting 'state' col
df.select('state').show()

+--------------+
|         state|
+--------------+
|        Kansas|
|     Tennessee|
|      Oklahoma|
|      Nebraska|
|North Carolina|
|  South Dakota|
|     Minnesota|
|      Maryland|
| New Hampshire|
|   Connecticut|
|       Arizona|
|       Vermont|
|      Missouri|
|          Iowa|
|  North Dakota|
|         Texas|
| West Virginia|
|      Illinois|
|     Tennessee|
|   Mississippi|
+--------------+
only showing top 20 rows



### Date offset
The start_date field contains data in a variety of formats. These may include e.g., "June 23, 1912" or "5/11/1930" (month, day, year). But not all values are valid dates. Invalid dates may include e.g., "June 2018", "3/06" (incomplete dates) or even arbitrary natural language. Add a start_date_description field adjacent to the start_date column to filter invalid date values into. Normalize all valid date values in start_date to ISO 8601 (i.e., YYYY-MM-DD).

In [23]:
# Inspect 'start_date' col
df.select('start_date').show(200)

+--------------------+
|          start_date|
+--------------------+
|               10/06|
|    Voluptatem odio.|
|Est sed et suscipit.|
|               06/71|
|               05/02|
|               10/95|
|      Ea nostrum et.|
|          10/20/1994|
|               09/74|
|Quibusdam similique.|
|          09/21/1977|
|       December 1999|
|       July 11, 1995|
| Consequuntur rerum.|
|        October 1973|
|      September 1998|
| Repellat accusamus.|
|      March 12, 2000|
|          1999-09-15|
|          1970-02-19|
|  September 18, 1988|
|       Eum adipisci.|
|               02/07|
|           July 1994|
| Voluptates tempore.|
|               03/80|
|          1991-07-22|
|            May 1988|
|               10/02|
|               11/97|
|          March 2005|
|   Et tempore rerum.|
|Accusamus quia odit.|
|  Voluptatibus iste.|
|          April 2015|
|        Fugiat quia.|
|               04/01|
|       February 1993|
|      Sed quo velit.|
|Voluptatem quia qui.|
| Ut debiti

Spotted 3 date formats that are valid.
 - "MMMM/DD/YYYY"
 - "YYYY-MM-DD"
 - "MMMM DD, YYYY"

In [24]:
# Enhance to_date() function to take multiple formats and merge the results
def to_date_(col, formats=("MM/d/y", "y-MM-d", "MMMM d, y")):
    return coalesce(*[to_date(col, f) for f in formats])

In [25]:
# Create 'sd' col by running to_date_() on 'start_date' col
df = df.withColumn('sd', to_date_('start_date'))

In [26]:
# Use 'sd' col to create 'start_date_description' col describing if 'start_date' vals are Valid or Invalid
df = df.withColumn('start_date_description', when(col('sd').isNull(), "Invalid").otherwise("Valid"))

In [27]:
# As per requirements - the 'start_date' column valid values get normalized while invalid remain unchanged. 
# If it is not what the reviewer had in mind then in 'sd' column invalid values were turned to null 
df = df.withColumn('start_date', coalesce(col('sd'), col('start_date')))

In [28]:
# Confirming results by inspecting dataframe
df.select('start_date', 'start_date_description', 'sd').show(500)

+--------------------+----------------------+----------+
|          start_date|start_date_description|        sd|
+--------------------+----------------------+----------+
|               10/06|               Invalid|      null|
|    Voluptatem odio.|               Invalid|      null|
|Est sed et suscipit.|               Invalid|      null|
|               06/71|               Invalid|      null|
|               05/02|               Invalid|      null|
|               10/95|               Invalid|      null|
|      Ea nostrum et.|               Invalid|      null|
|          1994-10-20|                 Valid|1994-10-20|
|               09/74|               Invalid|      null|
|Quibusdam similique.|               Invalid|      null|
|          1977-09-21|                 Valid|1977-09-21|
|       December 1999|               Invalid|      null|
|          1995-07-11|                 Valid|1995-07-11|
| Consequuntur rerum.|               Invalid|      null|
|        October 1973|         

## QC check
Direct checks on processed columns to confirm successful data preparation. Serving the role of sanity check.

### 'bio' column normalization

In [29]:
# Tests if 'bio' column does not contain \n', '\t', '  ', '   '

# Number of records in bio col with undesired characters
bio_mask = df.bio.contains('\n') | df.bio.contains('\t') | df.bio.contains('  ') | df.bio.contains('  ')
bio_count = df.filter(bio_mask).count()

# Check results
if bio_count == 0:
    bio_check = True
    print('QC: bio column normalization - Passed')
else:
    bio_check = False
    print('QC: bio column normalization - FAILED')

QC: bio column normalization - Passed


### 'state' column abbreviation swap

In [30]:
# Tests if all 'state' column strings are larger than 2 

# Number of records in 'state' col with strings length < 2
state_mask = length(col('state')) < 3
state_count = df.filter(state_mask).count()

# Check results
if state_count == 0:
    state_check = True
    print('QC: state column abbreviation swap - Passed')
else:
    state_check = False
    print('QC: state column abbreviation swap - FAILED')

QC: state column abbreviation swap - Passed


### date offset
Disclaimer:  Hard to verify numerically. All depends whether user defined all valid formats found in raw column.

In [31]:
# Tests if 'sd' staging column dtype is DateType 
date_check1 = str(df.schema['sd'].dataType) == 'DateType()'

# Check results
if date_check1:
    print('QC: date offset - "sd" column is DateType - Passed')
else:
    print('QC: date offset - "sd" column is DateType - FAILED')

QC: date offset - "sd" column is DateType - Passed


In [32]:
# Tests if all valid dates found were converted to ISO 8601 'YYYY-MM-DD' standard

# Number of records with initial date format described as valid 
valid_count = df.filter(col('start_date_description').contains('Valid')).count()
# Number of records in ISO 8601 format in 'start_date' col
isodate_count = df.filter(to_date(col('start_date'),'y-MM-d').isNotNull()).count()

# Check results
date_check2 = valid_count == isodate_count
if date_check2:
    print('QC: date offset - All Valid dates converted to ISO 8601 - Passed')
else:
    print('QC: date offset - All Valid dates converted to ISO 8601 - FAILED')

QC: date offset - All Valid dates converted to ISO 8601 - Passed


In [33]:
# additional visual check
df.select('start_date', 'start_date_description','sd').sample(withReplacement=False, fraction=0.1).show(50)

+--------------------+----------------------+----------+
|          start_date|start_date_description|        sd|
+--------------------+----------------------+----------+
|          1999-09-15|                 Valid|1999-09-15|
|       February 1993|               Invalid|      null|
|               10/94|               Invalid|      null|
|       December 1996|               Invalid|      null|
|    Est et deserunt.|               Invalid|      null|
|Ad natus nulla modi.|               Invalid|      null|
|          1970-04-09|                 Valid|1970-04-09|
| Placeat non et qui.|               Invalid|      null|
|       Perspiciatis.|               Invalid|      null|
|Neque iste repellat.|               Invalid|      null|
|     Ab repudiandae.|               Invalid|      null|
|               08/90|               Invalid|      null|
|        October 1975|               Invalid|      null|
|          2001-02-05|                 Valid|2001-02-05|
|      Ut harum enim.|         

### QC results

In [36]:
# Print SUCCESS if all checks passed
if all((bio_check, state_check, date_check1, date_check2)):
    print("QC Results: SUCCESS")
else:
    print("QC Results: FAILURE")

QC Results: SUCCESS


## Save 

In [None]:
# Write enriched dataframe to csv and snappy.parquet
output_name_csv = 'enriched.csv'
output_name_parquet = 'enriched.snappy.parquet'

# mode('overwrite') can be omitted
df.write.mode('overwrite').option('header', True).option('delimiter', ',').csv(output_name_csv)
df.write.mode('overwrite').option("compression","snappy").parquet(output_name_parquet)

### Alternative  - writing to S3 bucket

Below is the proposal on how to load data onto AWS S3 bucket. The code describes necessary updates to the ETL above and it won't run. 

In [None]:
# Additional imports
import configparser
import os

# Read AWS credentials from 'dl.cfg' config file
config = configparser.ConfigParser()
config.read('dl.cfg')

# Save credentials as environment variables 
os.environ['AWS_ACCESS_KEY_ID']=config.get('USER', 'AWS_ACCESS_KEY_ID')
os.environ['AWS_SECRET_ACCESS_KEY']=config.get('USER', 'AWS_SECRET_ACCESS_KEY')

# Create a SparkSession object running locally with connection to AWS
spark = SparkSession.builder \
    .appName("InterviewChallenge") \
    .master("local[2]") \
    .config("spark.executor.memory", "2g") \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \ # aws connection
    .getOrCreate()

# Run etl to transform data set as previously

# Write to AWS S3 bucket
output_name_s3_csv = 's3a://<bucket_name>/enriched.csv'
output_name_s3_parquet = 's3a://<bucket_name>/enriched.snappy.parquet'

df.write.mode('overwrite').option('header', True).option('delimiter', ',').csv(output_name_s3_csv)
df.write.mode('overwrite').option("compression","snappy").parquet(output_name_s3_parquet)
