## Test Cases for DML used by Apache Spark
**Spark SQL 3.3 DML Reference** 
https://spark.apache.org/docs/3.3.0/sql-ref-syntax.html#dml-statements

To store these results configure data <mark>**storage account and container**</mark>.

Configure DIRECTORY_PATH for Insert Overwrite 

In [None]:
!pip install unittest-xml-reporting xmltodict

## Configure Result Storage Location

In [None]:
storage_account=""
result_container=""

## Initialize Common Variables for the test run

In [None]:
import time

# Don't change these variables
TEST_SUITE= "SPARK_SQL_DML"
RESULT_FILE_NAME="dml_test_result.parquet"
RAW_RESULT_FILE_NAME="raw_dml_test_result.parquet"
# Test Run ID
TEST_RUN_ID= round(time.time()*1000)
# Test platform
PLATFORM = "nameoftheplatform"
# Prefix for all tables
PREFIX = PLATFORM
SUFFIX = TEST_RUN_ID
# Spark SQL function
sql=spark.sql

### Configure Directory Path for Insert Overwrite 

In [None]:
DIRECTORY_PATH="Files/DML/InsertOverWriteTest"

### Set Common Spark Configurations

In [None]:
sql("set hive.exec.dynamic.partition.mode=nonstrict")

## DML - Insert Table

In [None]:
import unittest

class DMLInsertTableTest(unittest.TestCase):

    table_name=f"{PREFIX}_student_insert_table_{SUFFIX}"
    
    @classmethod
    def setUpClass(cls):
        table_name_sql = f"CREATE TABLE {cls.table_name} (id INT, name STRING) \
                        PARTITIONED BY (age INT)"
        try:
            sql(table_name_sql)
        except Exception as ex:
            msg={'command':'InsertTable Setup failed','status':'fail'}
            cls.fail(f"{msg}")

    def test_dml_insert_001_values(self):
        """insert values to a table"""
        try:
            sql(f"INSERT INTO {self.table_name} VALUES \
                 (1,'a',10),(2,'b',20),(3,'c',30);")
            record_count=sql(f"SELECT * FROM {self.table_name}").count()
            self.assertEqual(record_count,3)
        except Exception as ex:
            msg={'command':'INSERT USING VALUES','status':'fail'}
            self.fail(f"{msg}")

    def test_dml_insert_002_using_select(self):
        """insert values using select"""
        person_table_name=f"{PREFIX}_person_insert_table_{SUFFIX}"
        try:
            person_table_name_sql = f"CREATE TABLE {person_table_name} (id INT, name STRING, age INT) \
                        PARTITIONED BY (student_id INT)"
            sql(person_table_name_sql)
            sql(f"INSERT INTO {person_table_name} PARTITION (student_id=1234) \
                        SELECT id, name,age FROM {self.table_name} WHERE id=1")
            record_count=sql(f"SELECT * FROM {person_table_name}").count()
            self.assertEqual(record_count,1)
        except Exception as ex:
            msg={'command':'INSERT INTO PARTITION SELECT','status':'fail'}
            self.fail(f"{msg}")

        finally:
            sql(f"DROP TABLE IF EXISTS {person_table_name}")

    def test_dml_insert_003_using_table(self):
        """insert using table"""
        table_name_copy=f"{PREFIX}_student_insert_table_copy_{SUFFIX}"
        try:
            table_name_copy_sql = f"CREATE TABLE {table_name_copy} (id INT, name STRING) \
                        PARTITIONED BY (age INT)"
            sql(table_name_copy_sql)
            sql(f"INSERT INTO {table_name_copy} TABLE {self.table_name}")
            record_count=sql(f"SELECT * FROM {table_name_copy}").count()
            self.assertEqual(record_count,3)
            sql(f"DROP TABLE IF EXISTS {table_name_copy}")
        except Exception as ex:
            msg={'command':'INSERT INTO TABLE','status':'fail'}
            self.fail(f"{msg}")

        finally:
            sql(f"DROP TABLE IF EXISTS {table_name_copy}")

    def test_dml_insert_004_using_from(self):
        """insert using from"""
        table_name_cp_from=f"{PREFIX}_student_insert_table_cp_from_{SUFFIX}"
        try:
            table_name_copy_sql = f"CREATE TABLE {table_name_cp_from} (id INT, name STRING) \
                        PARTITIONED BY (age INT)"
            sql(table_name_copy_sql)
            sql(f"INSERT INTO {table_name_cp_from} FROM {self.table_name} SELECT id, name,age WHERE id=1")
            record_count=sql(f"SELECT * FROM {table_name_cp_from}").count()
            self.assertEqual(record_count,1)
        except Exception as ex:
            msg={'command':'INSERT INTO FROM SELECT','status':'fail'}
            self.fail(f"{msg}")
        
        finally:
            sql(f"DROP TABLE IF EXISTS {table_name_cp_from}")
    
    def test_dml_insert_005_with_column_list_with_part_spec(self):
        """insert with both a partition spec and a column list"""
        table_name_col_part=f"{PREFIX}_student_insert_table_col_list_{SUFFIX}"
        try:
            table_name_col_part_sql = f"CREATE TABLE {table_name_col_part} (id INT, name STRING) \
                        PARTITIONED BY (age INT)"
            sql(table_name_col_part_sql)
            sql(f"INSERT INTO {table_name_col_part} PARTITION (age=20) (id,name) VALUES (1,'a'),(2,'b')")
            record_count=sql(f"SELECT * FROM {table_name_col_part}").count()
            self.assertEqual(record_count,2)
        except Exception as ex:
            msg={'command':'INSERT With Column List and Partition Spec','status':'fail'}
            self.fail(f"{msg}")
        
        finally:
            sql(f"DROP TABLE IF EXISTS {table_name_col_part}")
        
    
    def test_dml_insert_006_with_typed_date_part_col(self):
        """insert Using a Typed Date Literal for a Partition Column Value"""
        table_name_typed_date_part=f"{PREFIX}_student_insert_table_date_part_{SUFFIX}"
        try:
            table_name_copy_sql = f"CREATE TABLE {table_name_typed_date_part} (id INT, name STRING) \
                        PARTITIONED BY (birthday DATE)"
            sql(table_name_copy_sql)
            sql(f"INSERT INTO {table_name_typed_date_part} PARTITION (birthday = date'2019-01-02') (id,name) VALUES (1,'a'),(2,'b')")
            record_count=sql(f"SELECT * FROM {table_name_typed_date_part}").count()
            self.assertEqual(record_count,2)
        except Exception as ex:
            msg={'command':'INSERT Using a Typed Date Literal for a Partition Column Value','status':'fail'}
            self.fail(f"{msg}")
        
        finally:
            sql(f"DROP TABLE IF EXISTS {table_name_typed_date_part}")
    
    def test_dml_insert_007_overwrite_spark_format(self):
        """insert overwrite for Spark format"""
        output_path = f"{DIRECTORY_PATH}/{TEST_RUN_ID}/spark"
        try:
            df = sql(f"SELECT * FROM {self.table_name}")
            df.write.parquet(f'{output_path}')
            sql_cmd = f"INSERT OVERWRITE DIRECTORY \
                    USING parquet \
                    OPTIONS ('path' '{output_path}') \
                  SELECT * FROM {self.table_name};"
            sql(sql_cmd)
        except Exception as ex:
            msg={'command':'INSERT OVERWRITE DIRECTORY USING parquet','status':'fail'}
            self.fail(f"{msg}")
        
    def test_dml_insert_008_overwrite_hive_format(self):
        """insert overwrite for hive format"""
        output_path = f"{DIRECTORY_PATH}/{TEST_RUN_ID}/hive"
        try:
            df = sql(f"SELECT * FROM {self.table_name}")
            df.write.parquet(f'{output_path}')
            sql_cmd = f"INSERT OVERWRITE LOCAL DIRECTORY '{output_path}'\
                    STORED AS orc \
                  SELECT * FROM {self.table_name};"
            sql(sql_cmd)
        except Exception as ex:
            msg={'command':'INSERT OVERWRITE LOCAL DIRECTORY STORED AS orc','status':'fail'}
            self.fail(f"{msg}")

    def test_dml_insert_009_load_into_table(self):
        """insert LOAD INTO TABLE"""
        input_path = f"{DIRECTORY_PATH}/{TEST_RUN_ID}/hive"
        table_name_load=f"{PREFIX}_student_insert_table_load_{SUFFIX}"
        table_sql = f"CREATE TABLE {table_name_load} (id INT, name STRING, age INT) USING Hive"
        try:
            sql(table_sql)
            sql_cmd = f"LOAD DATA LOCAL INPATH '{input_path}' OVERWRITE INTO TABLE {table_name_load}"
            sql(sql_cmd)
            df = sql(f"SELECT * FROM {table_name_load}").count()
            self.assertEqual(record_count,2)
        except Exception as ex:
            msg={'command':'LOAD DATA LOCAL INTO TABLE','status':'fail'}
            self.fail(f"{msg}")

        finally:
            sql(f"DROP TABLE IF EXISTS {table_name_load}")

    @classmethod
    def tearDownClass(cls):
        """tear down"""
        sql(f"DROP TABLE IF EXISTS {cls.table_name}")


# TODO: Add Hive complex datatype, avro and 

### Execute Test Case

In [None]:
import io
import xmlrunner
loader = unittest.TestLoader()
suite  = unittest.TestSuite()

# add tests to the test suite
suite.addTests(loader.loadTestsFromTestCase(DMLInsertTableTest))


# initialize a runner, pass it your suite and run it
out = io.BytesIO()
runner = xmlrunner.XMLTestRunner(output=out)
result = runner.run(suite)

## Report for Test

In [None]:
from pyspark.sql.functions import col, explode,isnull,from_json, expr, to_json, coalesce, lit
from pyspark.sql.types import StructType,StructField,StringType
import json
import xmltodict

dict_result=xmltodict.parse(out.getvalue())
json_result = json.loads(json.dumps(dict_result,indent=4).replace('@',''))
test_suites=json_result['testsuites']['testsuite']

df = spark.read.json(sc.parallelize([test_suites]))
fail_schema = StructType([
  StructField("command", StringType(), True),
  StructField("status", StringType(),  True)
])

test_cases_df= df.withColumn('ts',explode('testcase')).drop(col('testcase'))
schema_string = test_cases_df.schema.simpleString()
if "failure:" in test_cases_df.schema.simpleString():
 explode_df= test_cases_df.withColumn('fail',from_json(col('ts.failure.message'),fail_schema)).drop(col('ts.failure'))
else:
 explode_df= test_cases_df.withColumn("fail",from_json(expr("to_json(named_struct('command', '', 'status', 'pass'))"),fail_schema))
 
df_test_result=explode_df.select(col("errors").alias("errorInSuite"),col("failures").alias("failedInSuite"),col("name").alias("suitename"),\
      "skipped",col("tests").alias("totalTest"), col("timestamp").alias("executionTime"),col("ts.name").alias("testCaseName"), \
       col("ts.time").alias("testCaseTime"),coalesce(col("fail.command"), lit("")).alias("failcommand"),coalesce(col("fail.status"), lit("pass")).alias("status"))

if (len(storage_account)>0 and len(result_container)>0):
    # save result to storage
    storage_path = f"abfs://{result_container}@{storage_account}.dfs.core.windows.net/{TEST_RUN_ID}/{PLATFORM}/{TEST_SUITE}"
    # write raw results
    df.write.parquet(f"{storage_path}/{RAW_RESULT_FILE_NAME}")
    # write transformed results
    df_test_result.write.parquet(f"{storage_path}/{RESULT_FILE_NAME}") 
else:
    print("configure storage path to store results")
    df_test_result.show(200,False)    