In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.utils import *
from datetime import datetime

# Configuration

In [None]:
# Spark and format
spark = SparkSession.builder \
    .config('spark.sql.debug.maxToStringFields', 2000) \
    .config('spark.debug.maxToStringFields', 2000) \
    .config('spark.sql.shuffle.partitions', 10) \
    .config('spark.master', 'local') \
    .appName('python spark test') \
    .getOrCreate()

# Get data from ADLSc

In [None]:
#get data from orc, parquet format
def load_local_trans_adls(tran_adls_path, data_format="parquet"):
    if data_format == "orc":
        df = spark.read.format(data_format).load(tran_adls_path)
    elif data_format == "parquet":
        df = spark.read.parquet(tran_adls_path)
        
    return df

# Compare Schema

In [None]:
def check_Schema(test_df, dev_df):
    schema_test = test_df.columns
    schema_dev = dev_df.columns
    if len(set(schema_dev) - set(schema_test)) == 0 :
        return list(set(schema_test) & set(schema_dev))
    else:
        return list((set(schema_dev).union(set(schema_test))) - (set(schema_test) & set(schema_dev)))

# Function to check accuracy data

In [None]:
def accuracy_test(test_df, dev_df):
    print('1. Accuracy_test')
    print('1.1 Schema_test')
    if ((len(check_Schema(test_df,dev_df )) == len(dev_df.columns)) == True )\
            & ((len(check_Schema(test_df,dev_df )) == len(test_df.columns)) == True):
        print('- accuracy_test: Test Pass')
        print('\t + test_df: {} fields \n \t + dev_df: {} fields \n\t Two Schema are the same'
              .format(len(test_df.columns), len(dev_df.columns)))
    else:
        print('- accuracy_test: Test Fail')
        print('\t + test_df: {} fields \n \t + dev_df: {} fields \n\t Two Schema are not the same'
              .format(len(test_df.columns), len(dev_df.columns)))
        print('List of different fields: ')
        for i in (check_Schema(test_df, dev_df)):
            print('+', i)
        return None
       
        print('- accuracy_test: Test Fail.')
    schema_test = test_df.columns
    schema_dev = dev_df.columns
    dev_df = dev_df.select(list(set(schema_test) & set(schema_dev)))
    test_df = test_df.select(list(set(schema_test) & set(schema_dev)))
    output_count = dev_df.count()
    expected_count = test_df.count()
    intersect_data = test_df.intersectAll(dev_df)
    intersect_count = intersect_data.count()
    print('1.2 Check Data in two path')
    print(
        '- accuracy_test: \n\t+ size of intersect data: {} \n\t+ test_df count: {}  \n\t+ dev_df count: {}'.format(
            intersect_count,
            expected_count,
            output_count))
    if intersect_count != expected_count or intersect_count != output_count:
        print(
            '- accuracy_test: Test Fail. \nTest_df has {} rows but intersect data between test_df and dev_df has {} rows'.format(
                expected_count, intersect_count))
        diff_expected_vs_intersect = test_df.subtract(intersect_data)
        diff_output_vs_intersect = dev_df.subtract(intersect_data)
        if diff_expected_vs_intersect.count() > 0:
            print(
                '- accuracy_test: test_df data has these rows. But not has in dev_df data <Only show first 5 rows>:')
            diff_expected_vs_intersect.show(5, truncate=False)
        if diff_output_vs_intersect.count() > 0:
            print(
                '- accuracy_test: dev_df data has these rows. But not has in test_df data <Only show first 5 rows>:')
            diff_output_vs_intersect.show(5, truncate=False)
    else:
        print('- accuracy_test: Test Pass. \nData in test_df and dev_df are equals')

# Function to check completenes data

In [None]:
def completeness_test(test_df, dev_df):
    print('2. Completeness_test')
    print('2.1 Amount of two data')
    if(test_df.count() == dev_df.count()):
        print('- completeness_test: Test Pass')
        print('\t+ amount of test_df: {}\n\t+ amount of dev_df: {}'.format(test_df.count(),dev_df.count()))
    else:
        print('- completeness_test: Test Fail')
        print('\t+ amount of test_df: {}\n\t+ amount of dev_df: {}'.format(test_df.count(),dev_df.count()))
        # test_df.subtract(dev_df).head(truncate= False)
    print('2.2 All data in two path')
    schema_test = test_df.columns
    schema_dev = dev_df.columns
    dev_df = dev_df.select(list(set(schema_test) & set(schema_dev)))
    test_df = test_df.select(list(set(schema_test) & set(schema_dev)))
    output_count = dev_df.count()
    expected_count = test_df.count()
    intersect_data = test_df.intersectAll(dev_df)
    intersect_count = intersect_data.count()
    if intersect_count != expected_count or intersect_count != output_count:
        print(
            '- completeness_test: Test Fail. \nTest_df has {} rows but intersect data between test_df and dev_df has {} rows'.format(
                expected_count, intersect_count))
        diff_expected_vs_intersect = test_df.subtract(intersect_data)
        diff_output_vs_intersect = dev_df.subtract(intersect_data)
        if diff_expected_vs_intersect.count() > 0:
            print(
                '- completeness_test: test_df data has these rows. But not has in dev_df data <Only show first 5 rows>:')
            diff_expected_vs_intersect.show(5, truncate=False)
        if diff_output_vs_intersect.count() > 0:
            print(
                '- completeness_test: dev_df data has these rows. But not has in test_df data <Only show first 5 rows>:')
            diff_output_vs_intersect.show(5, truncate=False)
    else:
        print('- completeness_test: Test Pass. \nData in test_df and dev_df are equals')

# Convert Timestamp

In [2]:
def convertTimestamp(test_df,colname):
    test_df = test_df.withColumn(colname, from_utc_timestamp((col(colname)/1000)
                                                             .cast(TimestampType()), "PST"))
    return test_df

# Select data within a specific time

In [None]:
def select_data_between(df,start,end,colname):
    selected_in_dfs = df.select("*").where(col(colname).between(datetime.strptime(start, '%Y-%m-%d %H:%M:%S.%f'),
                                                                datetime.strptime(end, '%Y-%m-%d %H:%M:%S.%f')))
    return selected_in_dfs

# Check min datetime, max datetime when data transferred from stg-trans (get trans_kafka_timestamp column)

In [None]:
def select_data_max_min(test_df,colname):
    print(test_df.select(colname).rdd.min()[0])
    print(test_df.select(colname).rdd.max()[0])

# Get value of a specific column

In [None]:
def select_data_with_cols(test_df,dev_df,cols_name):


    selected_test_in_dfs = test_df.select(cols_name)
    selected_dev_in_dfs = dev_df.select(cols_name)

    output_count = selected_dev_in_dfs.count()
    expected_count = selected_test_in_dfs.count()
    intersect_data = selected_test_in_dfs.intersectAll(selected_dev_in_dfs)
    intersect_count = intersect_data.count()

    print('size of intersect data: {} \n\t+ test_df count: {} \n\t+ dev_df count: {}'.format(intersect_count,
                                                                                             expected_count,
                                                                                             output_count))

    if intersect_count != expected_count or intersect_count != output_count:
        print('Test Fail. \nTest_df has {} rows but intersect data between test_df and dev_df has {} rows'.format(expected_count,
                                                                                                                  intersect_count))

    diff_expected_vs_intersect = selected_test_in_dfs.subtract(intersect_data)
    diff_output_vs_intersect = selected_dev_in_dfs.subtract(intersect_data)

    if diff_expected_vs_intersect.count() > 0:
        print('test_df data has these rows. But not has in dev_df data <Only show first 5 rows>:')
        diff_expected_vs_intersect.show(5, truncate=False)
    
    if diff_output_vs_intersect.count() > 0:
        print('dev_df data has these rows. But not has in test_df data <Only show first 5 rows>:')
        diff_output_vs_intersect.show(5, truncate=False)

    return selected_in_dfs


In [None]:
table_name='CS_ESTIMATE_DETAILS'
dev_df_path = 'abfss://data@adledevadls2storage.dfs.core.windows.net/servicerequest/regulated/trans/es/ptccon/'+ table_name
test_df_path = 'abfss://data@adledevadls2storage.dfs.core.windows.net/servicerequest/regulated/trans/es/ptccon/'+ table_name

In [None]:
#dev_df_CS_ESTIMATE_DETAILS= load_local_trans_adls(dev_df_path, "parquet").where(("STAGE_KAFKA_TIMESTAMP") > "2021-06-27")
#dev_df_CS_ESTIMATE_DETAILS= load_local_trans_adls(dev_df_path, "parquet").where(("adl_ingest_time" > 2021-07-01-00) and ("adl_ingest_time" <= 2021-07-01-04))
#dev_df_CS_ESTIMATE_DETAILS= load_local_trans_adls(dev_df_path).filter(dev_df_CS_ESTIMATE_DETAILS.STAGE_KAFKA_TIMESTAMP > "2021-06-27").show(10)
dev_df_CS_ESTIMATE_DETAILS= load_local_trans_adls(dev_df_path)
#create temporay view
#dev_df_CS_ESTIMATE_DETAILS.createOrReplaceTempView("ParquetTable")

In [None]:
dev_df_CS_ESTIMATE_DETAILS= load_local_trans_adls(dev_df_path, "parquet").replace('null',None).na.fill('null')
test_df_CS_ESTIMATE_DETAILS = load_local_trans_adls(test_df_path, "parquet").replace('null',None).na.fill('null')

In [None]:
dev_df_CS_ESTIMATE_DETAILS.printSchema()
test_df_CS_ESTIMATE_DETAILS.printSchema()

In [None]:
dev_df_CS_ESTIMATE_DETAILS = load_local_trans_adls(dev_df_path, "orc").drop("KAFKA_OFFSET", "KAFKA_PARTITION", "KAFKA_TOPIC",
                                                                           "ADL_INGEST_CHANNEL", "ADL_TRANS_LOAD_TIME",
                                                                           "ADL_INGEST_TIME", "TABLE_NAME", "ADL_DATA_SOURCE",
                                                                           "ADL_INGEST_PARTITION", "INSTANCE_ID", "ADL_TRAN_CODE",
                                                                           "TRANS_KAFKA_TIMESTAMP").replace('null',None).na.fill('null')
test_df_CS_ESTIMATE_DETAILS = load_local_trans_adls(test_df_path, "orc").replace('null',None).na.fill('null')

In [None]:
dev_df_CS_ESTIMATE_DETAILS.printSchema()
dev_df_CS_ESTIMATE_DETAILS[['stage_kafka_timestamp']].show(truncate = False)
dev_df_CS_ESTIMATE_DETAILS.count()

In [None]:
test_df_CS_ESTIMATE_DETAILS.printSchema()
test_df_CS_ESTIMATE_DETAILS[['stage_kafka_timestamp']].show(truncate = False)
test_df_CS_ESTIMATE_DETAILS.count()

In [None]:
select_data_max_min(test_df_CS_ESTIMATE_DETAILS,"stage_kafka_timestamp")
select_data_max_min(dev_df_CS_ESTIMATE_DETAILS,"stage_kafka_timestamp")

In [None]:
#start = '2020-11-11 03:06:35.336000'
#end = '2020-11-11 03:06:35.336000'

In [None]:
select_test_df_between_CS_ESTIMATE_DETAILS = select_data_between(test_df_CS_ESTIMATE_DETAILS, start, end,"stage_kafka_timestamp")
select_dev_df_between_CS_ESTIMATE_DETAILS = select_data_between(dev_df_CS_ESTIMATE_DETAILS, start, end,"stage_kafka_timestamp")
#select_test_df_between.show(40, False)

In [None]:
select_dev_df_between_CS_ESTIMATE_DETAILS.count()

In [None]:
select_test_df_between_CS_ESTIMATE_DETAILS.count()

In [None]:
select_test_df_between_CS_ESTIMATE_DETAILS.dropDuplicates().count()

In [None]:
completeness_test(select_test_df_between_CS_ESTIMATE_DETAILS, select_dev_df_between_CS_ESTIMATE_DETAILS)

In [None]:
accuracy_test(select_test_df_between_CS_ESTIMATE_DETAILS, select_dev_df_between_CS_ESTIMATE_DETAILS)

In [None]:
select_test_df_between_CS_ESTIMATE_DETAILS.show( 40, truncate = False)

In [None]:
select_dev_df_between_CS_ESTIMATE_DETAILS.show( 40, truncate = False)