In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (current_date, date_sub, dayofmonth, month,
                                   to_date, year)

from datetime import date, timedelta, datetime
import re
import os.path

In [2]:
def daterange(start_date, end_date):
    """Create a date range generator

    Args:
        start_date (str): start date
        end_date (str): end date

    Returns:
        a generator for date range

    """
    for n in range(int((end_date - start_date).days + 1)):
        yield start_date + timedelta(n)

In [3]:
def create_spark_date_filter(start_date, end_date):
    """ Constructs a Spark Dataframe.filter() that will match the columns 'year','month','day' to the date range between the start_date (inclusive) and end_date arguments (inclusive)

    Args:
        start_date (str):  "2021-6-1"
        end_date (str): "2021-6-2"

    Returns:
        filter (str)

    Example:
        start_date =  date(2021, 6, 1)
        end_date =  date(2021, 6, 2)
        returns '(year = 2021 and month = 06 and day = 01) or (year = 2021 and month = 06 and day = 02)'
    """
    start_date = datetime.strptime(start_date, "%Y-%m-%d")
    end_date = datetime.strptime(end_date, "%Y-%m-%d")

    filter = ""
    # create a list of strings '(year == %Y and month == %m and day == %d)'
    times = [single_date.strftime('(year == %Y and month == %m and day == %d)') for single_date in daterange(start_date, end_date)]
    # join list
    filter = " or ".join(times)
    return filter

In [4]:
def interpret_input_date_arg(date_arg):
    """ Checks the date argument for special cases:
     - 'TODAY' is replaced with the current date
     - 'n_DAYS_AGO' is replaced the date n days ago

     Args:
         date_arg (str): ideally "TODAY" or "n_DAYS_AGO"

     Return:
         a date (str if no match, or date if it matches today's date or a relative date to today)
    """
    # check if it's supposed to be today's date
    if "TODAY".casefold() == str(date_arg).casefold():
        return date.today().strftime("%Y-%m-%d")

    # check for a relative date to today
    n_days_ago = r'(\d+)_DAYS_AGO'
    match = re.match(n_days_ago, str(date_arg))
    if match:
        days = match.groups(1)[0]
        return datetime.strftime(date.today() - timedelta(int(days)), "%Y-%m-%d")

    # no special matches, return the original date string
    return date_arg

In [5]:
def get_version_number():
    if os.path.exists('version.yaml'):
        version = open('version.yaml', 'r').read()
        if (len(version.strip()) != 0):
            return version
    return 'No version defined'

In [6]:
# set top level variables
database_raw = "raw"
database_optimise = "optimise"
database_mart = "mart"
table_branch = "branch_0"
table_constraint = "constraint_0"
table_constraintbranch = "constraintbranch_0"

In [7]:
# initiate spark session
spark = SparkSession \
    .builder \
    .appName("full_branch_constraint_constraintbranch") \
    .config("spark.hadoop.fs.s3a.s3guard.ddb.region", "us-east-2")\
    .config("spark.yarn.access.hadoopFileSystems", "s3a://go01-demo/")\
    .enableHiveSupport() \
    .getOrCreate()

Setting spark.hadoop.yarn.resourcemanager.principal to pauldefusco


In [8]:
# Setup logger
log4jLogger = spark._jvm.org.apache.log4j
logger = log4jLogger.LogManager.getLogger(__name__)
logger.info("Job is running on "+ get_version_number())

In [9]:
env = "prod"
bucket = "s3a://go01-demo/tp-cdp-datalake-landing-%s" % (env)
project = "datalake"

In [10]:
# hive configuration
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
spark.conf.set("hive.exec.dynamic.partition", "true")
spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
spark.conf.set("spark.hive.mapred.supports.subdirectories", "true")
spark.conf.set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive", "true")

In [11]:
# drop temp view
spark.catalog.dropTempView("%s_mkt_%s" % (database_raw, table_branch))

In [12]:
# create raw database with location
spark.sql(
    "CREATE DATABASE IF NOT EXISTS %s LOCATION '%s/%s/%s/%s'" % (database_raw, bucket, env, project, database_raw))

Hive Session ID = 647bcee2-c420-4686-b9de-c87b46831039


DataFrame[]

In [13]:
# drop table
spark.sql("DROP TABLE IF EXISTS %s.%s" % (database_raw, table_branch))

DataFrame[]

In [14]:
# read last 3 day's of partitions from landing parquet files
basePath = "s3a://go01-demo/tp-cdp-datalake-landing-%s/%s/raw_mkt_mkt%s" % (
    env, database_raw, table_branch)
inputPath = "s3a://go01-demo/tp-cdp-datalake-landing-%s/%s/raw_mkt_mkt%s/year=*/month=*/day=*" % (
    env, database_raw, table_branch)

In [15]:
basePath

's3a://go01-demo/tp-cdp-datalake-landing-prod/raw/raw_mkt_mktbranch_0'

In [16]:
inputPath

's3a://go01-demo/tp-cdp-datalake-landing-prod/raw/raw_mkt_mktbranch_0/year=*/month=*/day=*'

In [17]:
# temporary: setup test environment but sharing the same kinesis source as dev
#share_dev_source = spark.conf.get("spark.driver.share.dev.source", "false")
#if share_dev_source and share_dev_source == "true":
#    basePath = "s3a://go01-demo/tp-cdp-datalake-landing-%s/%s/raw_mkt_mkt%s" % (
#        "prod", database_raw, table_branch)
#    inputPath = "s3a://go01-demo/tp-cdp-datalake-landing-%s/%s/raw_mkt_mkt%s/year=*/month=*/day=*" % (
#        "prod", database_raw, table_branch)

inputDf_raw_branch = spark.read.options(
    mergeSchema="true", basePath=basePath, recursiveFileLookup=True, pathGlobFilter="*.parquet").parquet(inputPath)

                                                                                

In [28]:
inputDf_raw_branch.head()

                                                                                

Row(table_name='MKTBRANCH', operation='UPDATE', timestamp='1627817907216', primarykey='5510.0', branchid='5510.0', csmmrid='598da924-30c7-4f26-b168-9c3ecd8135b8', branchname='WHI_T1.T1', branchtype='XF', currentnearopen='0', currentneardead='0', currentfaropen='0', currentfardead='0', currentnearmw='5.134', currentfarmw='-5.1', currentnearflow='7', currentfarflow='-7', currentlimit='73', currenttap='4.0', threewindingtype='0.0', createdby='holbrookr', createddate='1245035717014', modifiedby='mbl_fuse', modifieddate='1627817737643', auditosuser='mbl_fuse', auditmodule='JDBC Thin Client', audithost='172.30.18.206', discrepancytime='', discrepancyskip='0', discrepancybranchname='', year=2021, month=8, day=1)

In [18]:
firstDay = '2021-08-01'
lastDay = '2021-08-03'

In [19]:
# date range for job
#firstDay=interpret_input_date_arg(spark.conf.get("spark.driver.firstDay", '2_DAYS_AGO'))      # firstDay="2021-11-05"
#lastDay=interpret_input_date_arg(spark.conf.get("spark.driver.lastDay", 'TODAY'))             # lastDay="2021-11-07"
logger.info(f'Data date range ("firstDay" -> "lastDay"): "{firstDay}" -> "{lastDay}"')

In [20]:
print("First day")
print(firstDay)
print("Last day")
print(lastDay)

First day
2021-08-01
Last day
2021-08-03


In [21]:
inputDf_raw_branch \
    .filter(create_spark_date_filter(firstDay, lastDay)) \
    .createOrReplaceTempView("raw_mkt_%s" % (table_branch))

In [31]:
# create hive table with location
spark.sql("""
CREATE EXTERNAL TABLE IF NOT EXISTS %s.%s (
    operation string,
    timestamp bigint,
    primarykey string,
    branchid string,
    branchname string,
    branchtype string,
    currentnearopen int,
    currentneardead int,
    currentfaropen int,
    currentfardead int,
    currentnearmw float,
    currentfarmw float,
    currentnearflow float,
    currentfarflow float,
    currentlimit float,
    threewindingtype int,
    createddate bigint,
    modifieddate bigint
)
PARTITIONED BY (modified date)
STORED AS PARQUET LOCATION '%s/%s/%s/%s/%s'
""" % (database_raw, table_branch, bucket, env, project, database_raw, table_branch))


DataFrame[]

In [32]:
database_raw + "." + table_branch

'raw.branch_0'

In [33]:
# insert into hive table
spark.sql("""
insert
into %s.%s
partition(modified)
from raw_mkt_%s
select
    operation,
    timestamp,
    primarykey,
    branchid,
    branchname,
    branchtype,
    currentnearopen,
    currentneardead,
    currentfaropen,
    currentfardead,
    currentnearmw,
    currentfarmw,
    currentnearflow,
    currentfarflow,
    currentlimit,
    threewindingtype,
    createddate,
    modifieddate,
    to_date(from_unixtime(cast(round(modifieddate/1000) as bigint))) as modified
""" % (database_raw, table_branch, table_branch))

                                                                                

DataFrame[]

optimise_branch_0

In [34]:
# create database with location
spark.sql(
    "CREATE DATABASE IF NOT EXISTS %s LOCATION '%s/%s/%s/%s'" % (database_optimise, bucket, env, project, database_optimise))

DataFrame[]

In [35]:
spark.sql("""
CREATE EXTERNAL TABLE IF NOT EXISTS %s.%s (
    operation string,
    timestamp bigint,
    primarykey string,
    branchid string,
    branchname string,
    branchtype string,
    currentnearopen int,
    currentneardead int,
    currentfaropen int,
    currentfardead int,
    currentnearmw float,
    currentfarmw float,
    currentnearflow float,
    currentfarflow float,
    currentlimit float,
    threewindingtype int,
    createddate bigint,
    modifieddate bigint
)
PARTITIONED BY (modified date)
STORED AS PARQUET LOCATION '%s/%s/%s/%s/%s'
""" % (database_optimise, table_branch, bucket, env, project, database_optimise, table_branch))

DataFrame[]

In [37]:
insertDf_raw_branch = spark.sql("""
select
    operation,
    timestamp,
    primarykey,
    branchid,
    branchname,
    branchtype,
    currentnearopen,
    currentneardead,
    currentfaropen,
    currentfardead,
    currentnearmw,
    currentfarmw,
    currentnearflow,
    currentfarflow,
    currentlimit,
    threewindingtype,
    createddate,
    modifieddate,
    modified
from %s.%s
where
    (modified>=date_sub(to_date("%s", "yyyy-MM-dd"), 1))
""" % (database_raw, table_branch, firstDay))

In [38]:
insertDf_raw_branch.write.insertInto("%s.%s" % (database_optimise, table_branch), overwrite=True)

                                                                                

raw_constraintbranch_0

In [39]:
# create temp table
spark.catalog.dropTempView("%s_mkt_%s" % (database_raw, table_constraint))

# create database with location
spark.sql(
    "CREATE DATABASE IF NOT EXISTS %s LOCATION '%s/%s/%s/%s'" % (database_raw, bucket, env, project, database_raw))

# drop table
spark.sql("DROP TABLE IF EXISTS %s.%s" % (database_raw, table_constraint))

DataFrame[]

In [40]:
# read last 3 day's of partitions from landing parquet files
basePath = "s3a://go01-demo/tp-cdp-datalake-landing-%s/%s/raw_mkt_mkt%s" % (
    env, database_raw, table_constraintbranch)
inputPath = "s3a://go01-demo/tp-cdp-datalake-landing-%s/%s/raw_mkt_mkt%s/year=*/month=*/day=*" % (
    env, database_raw, table_constraintbranch)

In [41]:
basePath

's3a://go01-demo/tp-cdp-datalake-landing-prod/raw/raw_mkt_mktconstraintbranch_0'

In [42]:
inputDf_raw_constraintbranch = spark.read.options(
    mergeSchema="true", basePath=basePath, recursiveFileLookup=True, pathGlobFilter="*.parquet").parquet(inputPath)

                                                                                