In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType
from pyspark.sql.functions import udf
from datetime import datetime

In [2]:
def load_file_with_schema(spark, file_path):
    df = spark.read.csv(file_path, header=True, sep=",").cache()
    return df

In [3]:
def check_int(num):
    try:
        int(num)
    except ValueError:
        return f'VALUE ERROR -> {num}'
    return num

def check_date(date_):
    try:
        if date_ is not None:
            datetime.strptime(date_, "%Y-%m-%d")
    except ValueError:
        return f'VALUE ERROR -> {date_}'
    return date_

def check_timeformat(time_):
    try:
        if time_ is not None:
            datetime.strptime(time_, "%H:%M:%S")
    except ValueError:
        return f'VALUE ERROR -> {time_}'
    return time_

In [4]:
def validate_data_timesheet(df):
    NumberCheckerUDF = udf(lambda element: check_int(element))
    DateCheckerUDF = udf(lambda element: check_date(element))
    TimeCheckerUDF = udf(lambda element: check_timeformat(element))
    df = (
        df.withColumn("timesheet_id", NumberCheckerUDF("timesheet_id"))
        .withColumn("employee_id", NumberCheckerUDF("employee_id"))
        .withColumn("date", DateCheckerUDF("date"))
        .withColumn("checkin", TimeCheckerUDF("checkin"))
        .withColumn("checkout", TimeCheckerUDF("checkout"))
    )
    df_errors = df.filter(
        (df.timesheet_id.like ('VALUE ERROR%'))
        | (df.employee_id.like('VALUE ERROR%'))
        | (df.date.like('VALUE ERROR%'))
        | (df.checkin.like('VALUE ERROR%'))
        | (df.checkout.like('VALUE ERROR%'))
    ).collect()
    
    if len(df_errors) > 0:
        raise ValueError (f"Incorrect Data Type Found\n {df_errors}") 

    return df

def validate_data_employee(df):
    NumberCheckerUDF = udf(lambda element: check_int(element))
    DateCheckerUDF = udf(lambda element: check_date(element))
    TimeCheckerUDF = udf(lambda element: check_timeformat(element))
    df = (
        df.withColumn("employe_id", NumberCheckerUDF("employe_id"))
        .withColumn("branch_id", NumberCheckerUDF("branch_id"))
        .withColumn("salary", NumberCheckerUDF("salary"))
        .withColumn("join_date", DateCheckerUDF("join_date"))
        .withColumn("resign_date", DateCheckerUDF("resign_date"))
    )
    df_errors = df.filter(
        (df.employe_id.like ('VALUE ERROR%'))
        | (df.branch_id.like('VALUE ERROR%'))
        | (df.salary.like('VALUE ERROR%'))
        | (df.join_date.like('VALUE ERROR%'))
        | (df.resign_date.like('VALUE ERROR%'))
    ).collect()
    
    if len(df_errors) > 0:
        raise ValueError (f"Incorrect Data Type Found\n {df_errors}") 
    return df

def calculate_work_duration(checkin,checkout):
    if checkin < checkout:
        return True
    else:
        return False
    
    

In [5]:
def pipeline_employee(spark, date_filter=None):
    file_path = '../data/employees.csv'
    df = load_file_with_schema(spark, file_path)
    
    if date_filter:
        df = df.filter(df.join_date == date_filter)
    print('Total Records = {}'.format(df.count()))

    if df.count() == 0:
        print("No data found ...")
    else:
        df = validate_data_employee(df)
        df = df.drop_duplicates()
    return df
        
        
def pipeline_timesheet(spark, date_filter=None):
    file_path = '../data/timesheets.csv'
    df = load_file_with_schema(spark, file_path)
    
    if date_filter:
        df = df.filter(df.date == date_filter)
    print('Total Records = {}'.format(df.count()))
    
    if df.count() == 1:
        print("No data found ...")
    else:
        df = validate_data_timesheet(df)
        df = df.drop_duplicates(subset= ['employee_id', 'date'])
#         df = df.withColumn('work_hour', calculate_work_duration('checkin','checkout'))    
    return df

In [None]:
# if __name__ == '__main__':
appName = "etl"
master = "local"
conf = SparkConf() \
    .setAppName(appName) \
    .setMaster(master) \
#     .set("spark.driver.extraClassPath","sqljdbc_7.2/enu/mssql-jdbc-7.2.2.jre8.jar")

sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
spark = sqlContext.sparkSession

df = pipeline_employee(spark, date_filter = '2020-12-07')
# df = pipeline_timesheet(spark, date_filter = '2019-08-21')

In [None]:
prop = {'user': 'root',
        'password': 'csui6803',
        'driver': 'com.mysql.jdbc.Driver'
       }

# # database address (need to be modified)
url = 'jdbc:mysql://localhost:3306/employees'

df.write.jdbc(url=url, table='employee', mode='append', properties=prop)
# sc.stop()
df.write.format('jdbc').options(
      url='jdbc:mysql://localhost/employees',
      driver='com.mysql.jdbc.Driver',
#       driver = 'com.mysql.cj.jdbc.Driver',
      dbtable='employee',
      user='root',
      password='csui6803').mode('append').save()

In [None]:
df.show()