In [499]:
import findspark
findspark.init()

import pyspark
import pandas as pd
import numpy as np
import jsonschema
from jsonschema import validate


#sc = pyspark.SparkContext(appName="ETLPipeline")


In [461]:
from pyspark.sql import SQLContext, SparkSession
from pyspark.sql.types import StructType, StructField, StringType,IntegerType
from py4j.protocol import Py4JJavaError
#sqlContext = SQLContext(sc)

In [500]:
#hardcoding landingpath
landingPath='F:/interview_questions/submission/'

### parse input json

In [565]:
def readJSON(landingPath, basePath, fileName):
    with open(landingPath+"/"+ basePath+"//" +fileName) as f:
        param = json.load(f)
    return param

In [566]:
#read param file : 
param = readJSON(landingPath, "inputJson", "param.json")

In [567]:
paramSchema = readJSON(landingPath, "schemaDef", "paramSchema.json")

## validate if supplied json is correct

In [568]:
def validate_json(json_data, schema):

    try:
        validate(instance=json_data, schema=schema)
    except jsonschema.exceptions.ValidationError as err:
        print(err)
        err = "Given JSON data is InValid"
        return False, err

    message = "Given JSON data is Valid"
    return True, message

In [569]:
isUserJsonValid = validate_json(param, paramSchema)   

In [570]:
if isUserJsonValid[0] == False:
    sys.exit()

In [571]:
#prepare input dataframes and queries
#get all variables:
toDate = param["toDt"]
fromDate = param["fromDt"]
dateRangeFlg = param["dateRangeFlag"]
coalesce = param["coalesce"]
bucket = param["bucket"]
outputPartition = ",".join(param["outputPartition"])
commonColumn = param["commonColumn"]
query = param["query"]
outputFileName = param["outputDF"]["fileName"]
outputFileType = param["outputDF"]["fileType"]
outputFileSep = param["outputDF"]["separator"]
outputFileCompression = param["outputDF"]["compressionType"]




In [543]:
query

'select employee.id as id, employee.name as name, department.id as deptId, employee.part_dt as part_dt, employee.hour as hour from employee inner join department on employee.dept_name=department.dept_name'

### tuning parameters can be passed by user depending on use case

In [544]:
#need to get spark tuning parameters as list of tuples:
tuningParams = param["tuningParams"]
tuningParamsTuple = [(i, tuningParams[i]) for i in tuningParams]
tuningParamsTuple

[('spark.executor.memory', '8g'),
 ('spark.executor.cores', '3'),
 ('spark.cores.max', '3'),
 ('spark.driver.memory', '8g')]

In [545]:
config = pyspark.SparkConf().setAll(tuningParamsTuple)

### create spark session for the app

In [546]:
spark = SparkSession.builder.master("local[1]") \
                    .appName('SamplePipeline')\
                    .config(conf=config) \
                    .getOrCreate()

In [547]:
spark.sparkContext.getConf().getAll()

[('spark.driver.host', 'host.docker.internal'),
 ('spark.executor.id', 'driver'),
 ('spark.app.name', 'pyspark-shell'),
 ('spark.cores.max', '3'),
 ('spark.driver.port', '50900'),
 ('spark.rdd.compress', 'True'),
 ('spark.driver.memory', '8g'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.master', 'local[*]'),
 ('spark.executor.memory', '8g'),
 ('spark.executor.cores', '3'),
 ('spark.submit.deployMode', 'client'),
 ('spark.app.id', 'local-1614490772162'),
 ('spark.ui.showConsoleProgress', 'true')]

In [548]:
#parse input json
import json
from quinn import validate_schema,validate_absence_of_columns, validate_presence_of_columns
import sys
from datetime import date, timedelta

In [549]:
def getSubtractedDate(toSubtract):
    effDate = date.today() - timedelta(days=toSubtract)
    return effDate

In [550]:
prevDate = getSubtractedDate(1)
print(prevDate)

2021-02-28


### read files function

In [551]:
#function to read multiple type of files:
def readInput(landingPath, fileType, fileName, delim, custom_schema,dateRangeFlg, fromDate, toDate, prevDate):
    if dateRangeFlg == "y":
        whereClause = "part_dt >="+ fromDate+ " and part_dt<="+ toDate
    else:
        whereClause = "part_dt===" + prevDate
    print(whereClause)
    if fileType == "parquet":
        df  = spark.read.format('parquet').load(landingPath+"/input/"+fileName+"/").schema(custom_schema).where(whereClause).repartition(commonColumn)
    elif fileType == "csv":
        df = spark.read.format('csv').option("sep",delim).option("header","true").load(landingPath+ "input/" + fileName +"/").where(whereClause).repartition(commonColumn)
    elif fileType =='json':
        df = spark.read.format('json').load(landingPath+"/input/"+fileName+"/").schema(custom_schema).repartition(commonColumn)
    else:
        print("Invalid File type")
    return df        
    

In [552]:
### write files function

In [553]:
#function to write df to file:
def writeOutput(df, landingPath, fileType, fileName, delim,outputPartitions, outputFileCompression):
    if fileType == "parquet":
        df.write.mode('overwrite').option("compression",outputFileCompression).partitionBy(outputPartitions).parquet(landingPath+"/output/"+fileName+"/")
    elif fileType == "csv":
        df.write.mode('overwrite').partitionBy(outputPartitions).csv(landingPath+"/output/"+fileName+"/")
    elif fileType =='json':
        df.write.mode('overwrite').partitionBy(outputPartitions).json(landingPath+"/output/"+fileName+"/")
    else:
        raise Exception   

In [554]:
#function to read schema files:
def getSchema(landingPath,fileName):
    rdd = sc.wholeTextFiles(landingPath+"/schemaDef/"+fileName+".json")
    text = rdd.collect()[0][1]
    custom_schema = StructType.fromJson(json.loads(str(text)))
    return custom_schema

### basic schema validation using quinn library

In [555]:
def schemaValidator(custom_schema,df,tableName):
    try:
        validate_schema(df, custom_schema)
    except:
        raise Exception(tableName+" has invalid schema", sys.exc_info()[1])
        
    

In [556]:
dfs={}
for i in param["inputDF"]:
    custom_schema = getSchema(landingPath, i["table"])
    dfs[i["table"]] = {"dataframe":readInput(landingPath,i["fileType"],i["table"],i["separator"], custom_schema,
                                             dateRangeFlg, fromDate, toDate, prevDate), 
                       "inputSchema":custom_schema}

part_dt >=20210216 and part_dt<=20210228
part_dt >=20210216 and part_dt<=20210228


In [557]:
dfs

{'department': {'dataframe': DataFrame[id: string, dept_name: string, part_dt: int, hour: int],
  'inputSchema': StructType(List(StructField(id,StringType,true),StructField(dept_name,StringType,true),StructField(part_dt,IntegerType,true),StructField(hour,IntegerType,true)))},
 'employee': {'dataframe': DataFrame[id: string, name: string, dept_name: string, salary: string, part_dt: int, hour: int],
  'inputSchema': StructType(List(StructField(id,StringType,true),StructField(name,StringType,true),StructField(salary,StringType,true),StructField(part_dt,IntegerType,true),StructField(hour,IntegerType,true),StructField(dept_name,StringType,true)))}}

In [558]:
#validate missing or extra columns
for tableName in dfs:
    schemaValidator(dfs[tableName]["inputSchema"],dfs[tableName]["dataframe"],tableName)
    dfs[tableName]['dataframe'].registerTempTable(tableName)

### register as table to run user query on it

### read the user query and store output in a DF

In [561]:
query

'select employee.id as id, employee.name as name, department.id as deptId, employee.part_dt as part_dt, employee.hour as hour from employee inner join department on employee.dept_name=department.dept_name'

In [562]:
outputDF = sqlContext.sql(query)

In [563]:
outputDF

DataFrame[id: string, name: string, deptId: string, part_dt: int, hour: int]

### write output

In [564]:
writeOutput(outputDF, landingPath, outputFileType,outputFileName, outputFileSep,param["outputPartition"], outputFileCompression)