### 0.1 Setup

In [1]:
from pyspark.sql import SparkSession

#Create SparkSession
spark = SparkSession.builder.appName('govtech_challenge').getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/11/26 11:05:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
spark

In [3]:
spark.sparkContext.getConf().getAll()

[('spark.driver.extraJavaOptions',
  '-XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED'),
 ('spark.executor.id', 'driver'),
 ('spark.app.name', 'govtech_challenge'),
 ('spark.app.startTime', '1669431910567'),
 ('spark.app.id', 'local-1669431911555'),
 ('spark.driver.port', '54685'),
 ('spark.rdd.compress', 'True'),
 ('spa

### 0.2 Task

Application mobile number is 8 digits Applicant is over 18 years old as of 1 Jan 2022 Applicant has a valid email (email ends with @emailprovider.com or @emailprovider.net) You are required to format datasets in the following manner:

Split name into first_name and last_name Format birthday field into YYYYMMDD Remove any rows which do not have a name field (treat this as unsuccessful applications) Create a new field named above_18 based on the applicant's birthday Membership IDs for successful applications should be the user's last name, followed by a SHA256 hash of the applicant's birthday, truncated to first 5 digits of hash (i.e <last_name>_<hash(YYYYMMDD)>) You are required to consolidate these datasets and output the successful applications into a folder, which will be picked up by downstream engineers. Unsuccessful applications should be condolidated and dropped into a separate folder.

You can use common scheduling solutions such as cron or airflow to implement the scheduling component. Please provide a markdown file as documentation.

### 0.3 Read

In [82]:
import os
upstream_dir = "/Users/tonyngmk/repo/Data-Engineer-Tech-Challenge/1-data-pipelines/data/input"
upstream_csv = [f for f in os.listdir(upstream_dir) if f.endswith(".csv")]
upstream_csv

['applications_dataset_1.csv', 'applications_dataset_2.csv']

In [276]:
from pyspark.sql.types import StructType, StringType, ByteType
from pyspark.sql.functions import lit

defined_schema = StructType() \
.add("name", StringType(), True) \
.add("email", StringType(), True) \
.add("date_of_birth", StringType(), True) \
.add("mobile_no", StringType(), True) \

sdf = spark.read.format("csv").load(upstream_dir, header=True, schema=defined_schema) # read entire directory
# problematic if directory is used for multiple input source of different schema

In [277]:
sdf.printSchema()

row = sdf.count()
col = len(sdf.columns)
print(f"Size: {row}*{col}") 

root
 |-- name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- date_of_birth: string (nullable = true)
 |-- mobile_no: string (nullable = true)

Size: 4999*4


In [279]:
sdf = sdf.withColumn('is_successful', lit(None).cast(ByteType())) # add new col
sdf.limit(5).show(truncate=False) # Peek

+---------------+-----------------------------+-------------+---------+-------------+
|name           |email                        |date_of_birth|mobile_no|is_successful|
+---------------+-----------------------------+-------------+---------+-------------+
|Tony Shepherd  |Tony_Shepherd@petersen.com   |07/03/2016   |711447   |null         |
|Sherry Gonzalez|Sherry_Gonzalez@caldwell.biz |14-03-1973   |66744895 |null         |
|Ashlee Austin  |Ashlee_Austin@melendez.com   |12/09/1992   |6454197  |null         |
|David Brown    |David_Brown@jackson-smith.biz|2001-09-22   |69082983 |null         |
|Marc Meyer     |Marc_Meyer@chavez.com        |1996/05/17   |9727376  |null         |
+---------------+-----------------------------+-------------+---------+-------------+



In [305]:
# Final function
def read(upstream_dir):
    """
    read folder that specifically contains data of
    membership applications of e-commerce platform
    """
    defined_schema = StructType() \
    .add("name", StringType(), True) \
    .add("email", StringType(), True) \
    .add("date_of_birth", StringType(), True) \
    .add("mobile_no", StringType(), True)

    sdf = spark.read.format("csv").load(upstream_dir, header=True, schema=defined_schema)
    sdf = sdf.withColumn('is_successful', lit(1).cast(ByteType())) # add new col
    return sdf

sdf = read("/Users/tonyngmk/repo/Data-Engineer-Tech-Challenge/1-data-pipelines/data/input")

### 1. `mobile_no`
- 8 digits

In [336]:
from pyspark.sql.functions import length, regexp_extract, when

sdf2 = sdf.withColumn("mobile_no", regexp_extract(sdf.mobile_no, r'\d+', 0).alias('mobile_no'))

condition = length(sdf2.mobile_no) == 8
sdf2 = sdf2.withColumn("condition", when(condition, 1).otherwise(0))
sdf2 = sdf2.withColumn("is_successful", sdf2.is_successful.bitwiseAND(sdf2.condition).alias('is_successful'))
sdf2 = sdf2.drop('condition')
sdf2.limit(5).show(truncate=False)

print(f"Passed: {sdf2.filter(condition).count()}/{sdf.count()}")
print(f"Remaining: {sdf2.filter('is_successful=1').count()}/{sdf.count()}")

+---------------+-----------------------------+-------------+---------+-------------+
|name           |email                        |date_of_birth|mobile_no|is_successful|
+---------------+-----------------------------+-------------+---------+-------------+
|Tony Shepherd  |Tony_Shepherd@petersen.com   |07/03/2016   |711447   |0            |
|Sherry Gonzalez|Sherry_Gonzalez@caldwell.biz |14-03-1973   |66744895 |1            |
|Ashlee Austin  |Ashlee_Austin@melendez.com   |12/09/1992   |6454197  |0            |
|David Brown    |David_Brown@jackson-smith.biz|2001-09-22   |69082983 |1            |
|Marc Meyer     |Marc_Meyer@chavez.com        |1996/05/17   |9727376  |0            |
+---------------+-----------------------------+-------------+---------+-------------+

Passed: 937/4999
Remaining: 937/4999


### 2. `date_of_birth`

- format to yyyyMMdd
- over 18 years old as of 1 Jan 2022
- In other words, assert DOB < 1 Jan 2004

In [226]:
from pyspark.sql.functions import coalesce, to_date, date_format

def parse_date(date_col):
    """
    parse all date formats from str to date type
    """
    date_formats = ["yyyy-MM-dd",
                    "yyyy MM dd",
                    "MM/dd/yyyy",
                    "yyyy/MM/dd",
                    "dd-MM-yyyy"]
    return coalesce(*[to_date(date_col, format) for format in date_formats])

sdf3 = sdf2.withColumn("date_of_birth2", parse_date(sdf.date_of_birth).alias('date_of_birth2'))
sdf3.filter(sdf3.date_of_birth2.isNull()).limit(5).show(truncate=False) # check for remaining unparsed

+----+-----+-------------+---------+--------------+
|name|email|date_of_birth|mobile_no|date_of_birth2|
+----+-----+-------------+---------+--------------+
+----+-----+-------------+---------+--------------+



In [348]:
from dateutil.relativedelta import relativedelta
import datetime

sdf3 = sdf2.withColumn("date_of_birth", parse_date(sdf.date_of_birth).alias('date_of_birth'))

acceptable_date_from = (datetime.date(2022, 1, 1) - relativedelta(years=18)).strftime('%Y-%m-%d')
condition = sdf3.date_of_birth < (lit(acceptable_date_from))
sdf3 = sdf3.withColumn("condition", when(condition, 1).otherwise(0))
sdf3 = sdf3.withColumn("is_successful", sdf3.is_successful.bitwiseAND(sdf3.condition).alias('is_successful'))
sdf3 = sdf3.drop('condition')
sdf3.limit(5).show(truncate=False)

print(f"Passed: {sdf3.filter(condition).count()}/{sdf.count()}")
print(f"Remaining: {sdf3.filter('is_successful=1').count()}/{sdf.count()}")

+---------------+-----------------------------+-------------+---------+-------------+
|name           |email                        |date_of_birth|mobile_no|is_successful|
+---------------+-----------------------------+-------------+---------+-------------+
|Tony Shepherd  |Tony_Shepherd@petersen.com   |2016-07-03   |711447   |0            |
|Sherry Gonzalez|Sherry_Gonzalez@caldwell.biz |1973-03-14   |66744895 |1            |
|Ashlee Austin  |Ashlee_Austin@melendez.com   |1992-12-09   |6454197  |0            |
|David Brown    |David_Brown@jackson-smith.biz|2001-09-22   |69082983 |1            |
|Marc Meyer     |Marc_Meyer@chavez.com        |1996-05-17   |9727376  |0            |
+---------------+-----------------------------+-------------+---------+-------------+

Passed: 3935/4999
Remaining: 758/4999


In [353]:
# Date format
sdf3 = sdf3.withColumn("date_of_birth", date_format(sdf3.date_of_birth, "yyyyMMdd").alias('date_of_birth'))
sdf3.limit(5).show(truncate=False)

+---------------+-----------------------------+-------------+---------+-------------+---------+
|name           |email                        |date_of_birth|mobile_no|is_successful|condition|
+---------------+-----------------------------+-------------+---------+-------------+---------+
|Tony Shepherd  |Tony_Shepherd@petersen.com   |null         |711447   |0            |1        |
|Sherry Gonzalez|Sherry_Gonzalez@caldwell.biz |null         |66744895 |1            |0        |
|Ashlee Austin  |Ashlee_Austin@melendez.com   |null         |6454197  |0            |1        |
|David Brown    |David_Brown@jackson-smith.biz|null         |69082983 |1            |0        |
|Marc Meyer     |Marc_Meyer@chavez.com        |null         |9727376  |0            |1        |
+---------------+-----------------------------+-------------+---------+-------------+---------+



### 3. `email`
- Applicant has a valid email (email ends with @emailprovider.com or @emailprovider.net)

In [355]:

sdf4 = sdf3
condition = sdf4.email.rlike(email_regex_pattern)
sdf4 = sdf4.withColumn("condition", when(condition, 1).otherwise(0))
sdf4 = sdf4.withColumn("is_successful", sdf4.is_successful.bitwiseAND(sdf4.condition).alias('is_successful'))
sdf4 = sdf4.drop('condition')
sdf4.limit(5).show(truncate=False)

print(f"Passed: {sdf3.filter(condition).count()}/{sdf.count()}")
print(f"Remaining: {sdf4.filter('is_successful=1').count()}/{sdf.count()}")

+---------------+-----------------------------+-------------+---------+-------------+
|name           |email                        |date_of_birth|mobile_no|is_successful|
+---------------+-----------------------------+-------------+---------+-------------+
|Tony Shepherd  |Tony_Shepherd@petersen.com   |null         |711447   |0            |
|Sherry Gonzalez|Sherry_Gonzalez@caldwell.biz |null         |66744895 |0            |
|Ashlee Austin  |Ashlee_Austin@melendez.com   |null         |6454197  |0            |
|David Brown    |David_Brown@jackson-smith.biz|null         |69082983 |0            |
|Marc Meyer     |Marc_Meyer@chavez.com        |null         |9727376  |0            |
+---------------+-----------------------------+-------------+---------+-------------+

Passed: 3523/4999
Remaining: 526/4999


### 4. `name`

- Split name into `first_name` and `last_name`
- Remove rows that does not have a name

In [359]:
first_last_name_regex = r'([A-Za-z]+)\s+([A-Za-z]+)'

sdf5 = sdf4\
.withColumn("first_name", regexp_extract(sdf4.name, first_last_name_regex, 1).alias('first_name'))\
.withColumn("last_name", regexp_extract(sdf4.name, first_last_name_regex, 2).alias('last_name'))

condition = trim(sdf5.name) != ''

Column<'(NOT (trim(name) = ))'>

In [360]:
from pyspark.sql.functions import length, regexp_extract, trim
# from pyspark.sql import functions as F

first_last_name_regex = r'([A-Za-z]+)\s+([A-Za-z]+)'

sdf5 = sdf4\
.withColumn("first_name", regexp_extract(sdf4.name, first_last_name_regex, 1).alias('first_name'))\
.withColumn("last_name", regexp_extract(sdf4.name, first_last_name_regex, 2).alias('last_name'))


condition = trim(sdf5.name) != ''
sdf5 = sdf5.withColumn("condition", when(condition, 1).otherwise(0))
sdf5 = sdf5.withColumn("is_successful", sdf5.is_successful.bitwiseAND(sdf5.condition).alias('is_successful'))
sdf5 = sdf5.drop('condition')
sdf5.limit(5).show(truncate=False)
# sdf5 = sdf5.filter("trim(name) != ''")

# sdf5 = sdf4.withColumn('output', F.expr(r"regexp_extract_all(name, '([A-Za-z]+)\s+([A-Za-z]+)', 1)"))

print(f"Passed: {sdf4.filter(condition).count()}/{sdf.count()}")
print(f"Remaining: {sdf5.filter('is_successful=1').count()}/{sdf.count()}")

+---------------+-----------------------------+-------------+---------+-------------+----------+---------+
|name           |email                        |date_of_birth|mobile_no|is_successful|first_name|last_name|
+---------------+-----------------------------+-------------+---------+-------------+----------+---------+
|Tony Shepherd  |Tony_Shepherd@petersen.com   |null         |711447   |0            |Tony      |Shepherd |
|Sherry Gonzalez|Sherry_Gonzalez@caldwell.biz |null         |66744895 |0            |Sherry    |Gonzalez |
|Ashlee Austin  |Ashlee_Austin@melendez.com   |null         |6454197  |0            |Ashlee    |Austin   |
|David Brown    |David_Brown@jackson-smith.biz|null         |69082983 |0            |David     |Brown    |
|Marc Meyer     |Marc_Meyer@chavez.com        |null         |9727376  |0            |Marc      |Meyer    |
+---------------+-----------------------------+-------------+---------+-------------+----------+---------+

Passed: 4999/4999
Remaining: 526/499

In [361]:
sdf5 = sdf5.drop('name')
sdf5.printSchema()

root
 |-- email: string (nullable = true)
 |-- date_of_birth: string (nullable = true)
 |-- mobile_no: string (nullable = true)
 |-- is_successful: integer (nullable = false)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)



### Implement unit test

---
### Regular git commits

In [344]:
! git add .
! git commit -m "feat: add filtering of successful and unsuccessful for email and name to notebook"
! git push origin feat-1.0-pipeline-spark

[feat-1.0-pipeline-spark 41f8911] feat: add filtering of successful and unsuccessful for mobile_no and date_of_birth to notebook
 Committer: Tony Ng <tonyngmk@MBA.home>
Your name and email address were configured automatically based
on your username and hostname. Please check that they are accurate.
You can suppress this message by setting them explicitly. Run the
following command and follow the instructions in your editor to edit
your configuration file:

    git config --global --edit

After doing this, you may fix the identity used for this commit with:

    git commit --amend --reset-author

 2 files changed, 270 insertions(+), 114 deletions(-)
Enumerating objects: 9, done.
Counting objects: 100% (9/9), done.
Delta compression using up to 8 threads
Compressing objects: 100% (5/5), done.
Writing objects: 100% (5/5), 2.15 KiB | 2.15 MiB/s, done.
Total 5 (delta 3), reused 0 (delta 0), pack-reused 0
remote: Resolving deltas: 100% (3/3), completed with 3 local objects.[K
To github.com