# Imports and Method, Variables Declerations

In [1]:
from pyspark.sql.functions import col,concat
import pandas as pd
from pyspark.sql import SparkSession
from datetime import datetime
from pathlib import Path
import warnings
import boto3
warnings.filterwarnings("ignore")
session = boto3.Session()
credentials = session.get_credentials()
access_key = credentials.access_key
secret_key = credentials.secret_key

In [2]:
def get_spark():
    """
    Creating spark to be able to read data from S3 bucket
    :return:
    """
    spark = (
        SparkSession
            .builder
            .master("local[*]")
            .config('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:3.2.2')
            .config("fs.s3a.access.key", access_key)
            .config("fs.s3a.secret.key", secret_key)
            .config('spark.hadoop.fs.s3a.aws.credentials.provider',
                    'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider')
            .config("fs.s3a.endpoint.key", "s3.amazonaws.com")
            .config("spark.executor.memory", "70g")
            .config("spark.driver.memory", "50g")
            .config("spark.memory.offHeap.enabled", True)
            .config("spark.memory.offHeap.size", "32g")
            .getOrCreate()
    )
    return spark


def compare_validate_schema(df_ms, df):
    """
    Validate schema by comparing mapping spec and Dataframe schema
    :param df_ms: Pandas DataFrame, expected schema
    :param df: Spark DataFrame
    :return:
    """
    all_columns = set()
    all_columns.update(df.columns)
    all_columns.update(df_ms['column_name'])
    all_columns = list(all_columns)

    df_result = pd.DataFrame(columns=['column_name', 'ms', 's3'])

    for i in range(len(all_columns)):
        col_name = all_columns[i]
        df_result = df_result.append({'column_name': col_name,
                                      'ms': (df_ms['data_type'][df_ms['column_name'] == col_name]).values[0],
                                      's3': str(df.schema[col_name].dataType)}, ignore_index=True)
    df_diff = df_result[df_result.isna().any(axis=1) | (df_result['ms'] != df_result['s3'])]
    print('========= The difference between MS and S3 =====')
    print(df_diff)
#     assert len(df_diff) == 0, f'The MS and Data Schema does not match!!!'


def validate_pk(spark, df, pk_column_list):
    """
    Checks PK of dataframe
    :param spark: Spark
    :param df: Dataframe
    :param pk_column_list: PK column list of the Dataframe
    :return:
    """
    df.createOrReplaceTempView('table')
    pk_columns_str = ','.join(pk_column_list)
    query = f'select count(*) as duplicated_row_count from table group by {pk_columns_str} having count(*) > 1'
    # df.groupBy(*pk_column_list).count().where(col('count') > 1) >> same function different way
    result = spark.sql(query)
    duplicated_rows = result.groupby().sum().collect()[0][0]
    result.show()
    print(f'{duplicated_rows} duplicated rows found!!!')
#     assert duplicated_rows == None, f'{duplicated_rows} duplication found!!!'


def validate_not_null_columns(spark, df, not_null_column_list):
    """
    Checks not null for columns
    :param spark: Spark
    :param df: DataFrame
    :param not_null_column_list: list of not-null columns
    :return:
    """
    df.createOrReplaceTempView('table')
    query = "select "
    nn_columns = []
    for column in not_null_column_list:
        nn_columns.append(f'sum(case when {column} is null then 1 else 0 end) as {column} ')
    query = query + ','.join(nn_columns) + ' from table'
    result = spark.sql(query)
    sum_null_values = sum(result.collect()[0])
    result.show()
    print(f'{sum_null_values} null values found!!!')
#     assert sum_null_values == 0, f'{sum_null_values} null values found!!!'

def validate_column_has_supported_values(df, column, supported_values):
    """
    Checks if column only has supported values
    :param df: Spark dataframe
    :param column: column_name
    :param supported_values: supported values list
    :return:
    """
    column_values_list = df.select(column).filter(f'{column} is not Null').distinct().rdd.map(lambda r: r[0]).collect()
    unexpected_values_list = list(set(column_values_list) - set(supported_values))
    print(f'list of unexpected values from column: {unexpected_values_list}')
#     assert len(unexpected_values_list) == 0, f'{column} has unexpected values: {unexpected_values_list}!!!'

def validate_max_length_of_column(df, column, max_length):
    """
    Validate the char length of a column's values
    :param df: Dataframe
    :param column: column name
    :param max_length: expected max length
    :return:
    """
    count_outlier = 0
    for search_term in df.filter(f'{column} is not Null').select(column).distinct().rdd.map(lambda r: r[0]).collect():
        if len(search_term) > max_length:
            count_outlier += 1
    print(f'{count_outlier} outlier found!!!')
#     assert count_outlier == 0, f'{count_outlier} outliers found!!!'


def validate_column_value_greater_than(df, column, greater_than):
    """
    Counts outliers for column value be greater than a value
    Validates there is no outliers
    The method skip the check for null values as not-null is getting checked in another step
    :param df: dataframe
    :param column: column name of  the dataframe
    :param greater_than: value expected be greater than
    :return:
    """
    count_outlier = df.filter(f'{column} is not Null').where(
        f'{column} = {greater_than} or {column} < {greater_than}').count()
    print(f'{count_outlier} outlier found!!!')
#     assert count_outlier == 0, f'{count_outlier} outliers found!!! column: {column}, greater_than: {greater_than}'


def validate_str_date_format(date_text, date_format):
    """
    validate if the date_text matches to the date_format
    :param date_text: date to be validated
    :param format: format to be compared
    :return: True if format matches
    """
    try:
        if date_text != datetime.strptime(date_text, date_format).strftime(date_format):
            raise ValueError
        return True
    except ValueError:
        return False


def validate_date_format_for_column(df, column, date_format):
    """
    Validates date format
    Skip check for null values as not-null is getting checked in another step
    :param df: dataframe
    :param column: column to be checked
    :param date_format: string format of the date
    :return:
    """
    for dt in df.select(column).filter(f'{column} is not Null').distinct().rdd.map(lambda r: r[0]).collect():
        assert validate_str_date_format(str(dt), date_format), f'date format {dt} doesnt match to {date_format}!!!'
    print('All dates date format matches!!! Null values are skipped in that check, as that check done in previous steps!!!')


def validate_min_value_of_column(df, column, min_value):
    """
    Counts outliers for min value of a column
    Validates there is no outliers
    The method skip the check for null values as not-null is getting checked in another step
    :param df: dataframe
    :param column: column name of  the dataframe
    :param min_value: min value to be expected
    :return:
    """
    count_outlier = df.filter(f'{column} is not Null').where(
        f'{column} < {min_value}').count()
    print(f'{count_outlier} outlier found!!!')
#     assert count_outlier == 0, f'{count_outlier} outliers found!!! column: {column}, min_value: {min_value}'


def validate_max_value_of_column(df, column, max_value):
    """
    Counts outliers for max value of a column
    Validates there is no outliers
    The method skip the check for null values as not-null is getting checked in another step
    :param df: dataframe
    :param column: column name of  the dataframe
    :param max_value: max value to be expected
    :return:
    """
    count_outlier = df.filter(f'{column} is not Null').where(
        f'{column} > {max_value}').count()
    print(f'{count_outlier} outlier found!!!')
#     assert count_outlier == 0, f'{count_outlier} outliers found!!! column: {column}, max_value: {max_value}'


def get_path_as_string():
    """
    Gets the path of the file
    :return: path
    """
    p = Path(__file__)
    plist = str(p).split('/e2e/')
    return str(plist[0]) + '/'


def add_path_to_base_path(path_to_be_added):
    """
    adds path(param) to main path(../test/)
    :param path_to_be_added: should start from e2e/
    :return: new path with string format
    """
    return get_path_as_string() + path_to_be_added


In [3]:
source_bucket = 'adthena.data.qa.test'

supported_values_for_columns = {
    'device': ['desktop', 'mobile']
}

date_format = '%Y-%m-%d'

sa_ms_loc = '/Users/rtrn/PycharmProjects/AA_Task/test/e2e/data/scrape_appearances_ms.xlsx'
ca_ms_loc = '/Users/rtrn/PycharmProjects/AA_Task/test/e2e/data/competitor_appearances_ms.xlsx'



In [4]:
# scrape_appearances schema pandas dataframe
sa_ms_df = pd.read_excel(sa_ms_loc)

# competitor_appearances schema pandas dataframe
ca_ms_df = pd.read_excel(ca_ms_loc)

In [5]:
# what we expect from scrape_appearances
sa_ms_df

Unnamed: 0,column_name,data_type,pk,not_null
0,search_term,StringType,Y,Y
1,device,StringType,Y,Y
2,date,DateType,Y,Y
3,scrape_count,IntegerType,,Y


In [6]:
# what we expect from competitor_appearances
ca_ms_df

Unnamed: 0,column_name,data_type,pk,not_null
0,search_term,StringType,Y,Y
1,device,StringType,Y,Y
2,date,DateType,Y,Y
3,domain,StringType,Y,Y
4,sponsored_appearances,IntegerType,,Y
5,natural_appearances,IntegerType,,Y
6,pla_appearances,IntegerType,,Y
7,ctr,DoubleType,,


# 1) Scrape Appearances Quality Checks

In [7]:
# scrape_appearances schema PySpark dataframe
sa_df = get_spark().read.parquet('s3a://adthena.data.qa.test/scrape_appearances/*.parquet')
sa_df.printSchema()
sa_df.show(5)



:: loading settings :: url = jar:file:/Users/rtrn/Library/spark-3.2.1-bin-hadoop3.2/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/rtrn/.ivy2/cache
The jars for the packages stored in: /Users/rtrn/.ivy2/jars
org.apache.hadoop#hadoop-aws added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-ad792f40-184d-43ab-81da-ea4c9f436a5e;1.0
	confs: [default]
	found org.apache.hadoop#hadoop-aws;3.2.2 in central
	found com.amazonaws#aws-java-sdk-bundle;1.11.563 in central
:: resolution report :: resolve 282ms :: artifacts dl 6ms
	:: modules in use:
	com.amazonaws#aws-java-sdk-bundle;1.11.563 from central in [default]
	org.apache.hadoop#hadoop-aws;3.2.2 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   2   |   0   |   0   |   0   ||   2   |   0   |
	----------------------------------

root
 |-- search_term: string (nullable = true)
 |-- date: date (nullable = true)
 |-- device: string (nullable = true)
 |-- scrape_count: integer (nullable = true)



[Stage 1:>                                                          (0 + 1) / 1]

+--------------------+----------+-------+------------+
|         search_term|      date| device|scrape_count|
+--------------------+----------+-------+------------+
|$100 000 personal...|2022-05-13|desktop|           1|
|$135 nike men's b...|2022-05-13|desktop|           1|
|$199 michael star...|2022-05-13|desktop|           1|
|          $2000 loan|2022-05-13|desktop|           3|
|             $250.63|2022-05-13|desktop|           1|
+--------------------+----------+-------+------------+
only showing top 5 rows



                                                                                

In [8]:
# How many records exist in the dataframe
sa_df.count()

                                                                                

7143663

In [9]:
# PK list
pk_column_list = sa_ms_df[sa_ms_df['pk'] == 'Y']['column_name'].values.tolist()
pk_column_list

['search_term', 'device', 'date']

In [10]:
# Not null column list
not_null_column_list = sa_ms_df[sa_ms_df['not_null'] == 'Y']['column_name'].values.tolist()
not_null_column_list

['search_term', 'device', 'date', 'scrape_count']

In [11]:
# Validate schema and get the differences between the schema we expect and the actual schema
compare_validate_schema(sa_ms_df,sa_df)

Empty DataFrame
Columns: [column_name, ms, s3]
Index: []


In [12]:
# PK check
validate_pk(get_spark(),sa_df,pk_column_list)

                                                                                

+--------------------+
|duplicated_row_count|
+--------------------+
|                   2|
+--------------------+

2 duplicated rows found!!!


In [13]:
# Null check
validate_not_null_columns(get_spark(), sa_df, not_null_column_list)



+-----------+------+----+------------+
|search_term|device|date|scrape_count|
+-----------+------+----+------------+
|          0|     0|   2|           1|
+-----------+------+----+------------+

3 null values found!!!


                                                                                

In [14]:
# Checking weather device column has only supported values
supported_values = supported_values_for_columns['device']
validate_column_has_supported_values(sa_df,'device',supported_values)

[Stage 24:>                                                       (0 + 10) / 10]

list of unexpected values from column: ['tablet']


                                                                                

In [15]:
# Count how many unexpected records exist in device column, value as 'tablet'
count_unexpected_tablet_values = sa_df.filter(sa_df.device == 'tablet').count()
print(f'total {count_unexpected_tablet_values} values exist as "tablet" in device column')

[Stage 27:>                                                       (0 + 10) / 10]

total 2 values exist as "tablet" in device column




In [16]:
# max char length of the search_term column
validate_max_length_of_column(sa_df, 'search_term',400)

                                                                                

0 outlier found!!!


In [17]:
# scrape_count values greater than 0
validate_column_value_greater_than(sa_df,'scrape_count',0)

[Stage 33:>                                                       (0 + 10) / 10]

2 outlier found!!!


                                                                                

In [18]:
# date column format, Null values skipped as that is checked in previous steps
validate_date_format_for_column(sa_df, 'date', date_format)

[Stage 36:>                                                       (0 + 10) / 10]

All dates date format matches!!! Null values are skipped in that check, as that check done in previous steps!!!




# 2) Competitor Appearances

In [19]:
# competitor_appearances PySpark dataframe
ca_df = get_spark().read.parquet('s3a://adthena.data.qa.test/competitor_appearances/*.parquet')
ca_df.printSchema()
ca_df.show(5)



root
 |-- search_term: string (nullable = true)
 |-- date: date (nullable = true)
 |-- device: string (nullable = true)
 |-- domain: string (nullable = true)
 |-- sponsored_appearances: integer (nullable = true)
 |-- natural_appearances: integer (nullable = true)
 |-- pla_appearances: integer (nullable = true)
 |-- ctr: double (nullable = true)



[Stage 41:>                                                         (0 + 1) / 1]

+--------------------+----------+-------+--------------------+---------------------+-------------------+---------------+-------------------+
|         search_term|      date| device|              domain|sponsored_appearances|natural_appearances|pla_appearances|                ctr|
+--------------------+----------+-------+--------------------+---------------------+-------------------+---------------+-------------------+
|       samuel jarvis|2022-05-14|desktop|   jarvisarchives.ca|                    0|                  1|              0|               null|
|san diego charger...|2022-05-14|desktop|    cheaptickets.com|                    0|                  3|              0|               null|
|       sand blasting|2022-05-14|desktop|       wikipedia.org|                    0|                  3|              0|               null|
|sandals for beach...|2022-05-14|desktop|          zappos.com|                    0|                  0|              1|0.05480628460645676|
|santa barbar

                                                                                

In [20]:
# How many records exist in the dataframe
ca_df.count()

                                                                                

92805679

In [21]:
# PK list
pk_column_list = ca_ms_df[ca_ms_df['pk'] == 'Y']['column_name'].values.tolist()
pk_column_list

['search_term', 'device', 'date', 'domain']

In [22]:
# Not null column list
not_null_column_list = ca_ms_df[ca_ms_df['not_null'] == 'Y']['column_name'].values.tolist()
not_null_column_list

['search_term',
 'device',
 'date',
 'domain',
 'sponsored_appearances',
 'natural_appearances',
 'pla_appearances']

In [23]:
# Validate schema and get the differences between the schema we expect and the actual schema
compare_validate_schema(ca_ms_df,ca_df)

Empty DataFrame
Columns: [column_name, ms, s3]
Index: []


In [24]:
# PK check
validate_pk(get_spark(),ca_df,pk_column_list)



+--------------------+
|duplicated_row_count|
+--------------------+
+--------------------+

None duplicated rows found!!!




In [25]:
# Null check
validate_not_null_columns(get_spark(), ca_df, not_null_column_list)



+-----------+------+----+------+---------------------+-------------------+---------------+
|search_term|device|date|domain|sponsored_appearances|natural_appearances|pla_appearances|
+-----------+------+----+------+---------------------+-------------------+---------------+
|          0|     0|   0|     1|                    0|                  0|              0|
+-----------+------+----+------+---------------------+-------------------+---------------+

1 null values found!!!


                                                                                

In [26]:
# device column has only supported values
supported_values = supported_values_for_columns['device']
validate_column_has_supported_values(ca_df,'device',supported_values)



list of unexpected values from column: []


                                                                                

In [27]:
# max char length of the search_term column
validate_max_length_of_column(ca_df, 'search_term',400)

                                                                                

0 outlier found!!!


In [28]:
# date column format, Null values skipped as that is checked in previous steps
validate_date_format_for_column(ca_df, 'date', date_format)



All dates date format matches!!! Null values are skipped in that check, as that check done in previous steps!!!


                                                                                

In [29]:
# max char length of the domain column
validate_max_length_of_column(ca_df, 'domain',400)

                                                                                

0 outlier found!!!


In [30]:
# sponsored_appearances >= 0
validate_min_value_of_column(ca_df,'sponsored_appearances',0)



0 outlier found!!!


                                                                                

In [31]:
# natural_appearances >= 0
validate_min_value_of_column(ca_df,'natural_appearances',0)



0 outlier found!!!


                                                                                

In [32]:
# pla_appearances >= 0
validate_min_value_of_column(ca_df,'pla_appearances',0)



0 outlier found!!!


                                                                                

In [33]:
# ctr >= 0
validate_min_value_of_column(ca_df,'ctr',0)



1 outlier found!!!


                                                                                

In [34]:
# ctr <= 1.0
validate_max_value_of_column(ca_df,'ctr',1.0)



1 outlier found!!!




In [35]:
# dropping null values from PK (which is not expected to be in dataset)
sa_df_not_null_pk = sa_df.na.drop(subset=['search_term', 'device', 'date'])
sa_df_not_null_pk.count()

                                                                                

7143661

In [36]:
# creating id as a unique key for scrape_appearances
sa_df_not_null_pk = sa_df_not_null_pk.select("*", concat(sa_df_not_null_pk.search_term, sa_df_not_null_pk.device, sa_df_not_null_pk.date)
                         .alias("id"))
sa_df_not_null_pk.show(3)

[Stage 96:>                                                         (0 + 1) / 1]

+--------------------+----------+-------+------------+--------------------+
|         search_term|      date| device|scrape_count|                  id|
+--------------------+----------+-------+------------+--------------------+
|$100 000 personal...|2022-05-13|desktop|           1|$100 000 personal...|
|$135 nike men's b...|2022-05-13|desktop|           1|$135 nike men's b...|
|$199 michael star...|2022-05-13|desktop|           1|$199 michael star...|
+--------------------+----------+-------+------------+--------------------+
only showing top 3 rows



                                                                                

In [37]:
# We had 1 pair of dulication in scrape_appearances dataframe, which is not expected to exist, to drop it.
sa_df_not_null_pk = sa_df_not_null_pk.dropDuplicates()
sa_df_not_null_pk.count()

                                                                                

7143660

In [38]:
# creating same id for competitor appearances to match it with scrape appearances
ca_df = ca_df.select("*", concat(ca_df.search_term, ca_df.device, ca_df.date)
                   .alias("id"))

In [39]:
# To create view and use sql to check weather each row has a corresponding in scrape_appearances dataset 
sa_df_not_null_pk.createOrReplaceTempView('scrape_appearances')
ca_df.createOrReplaceTempView('competitor_appearances')

In [40]:
# count where there is no corresponding row
query = "select count(*) as no_corresponding from competitor_appearances where id not in (select id from scrape_appearances)"
result = get_spark().sql(query)
result.show()
# assert result.rdd.collect()[0][0] == 0



+----------------+
|no_corresponding|
+----------------+
|               1|
+----------------+



                                                                                

In [41]:
# The record in competitor_appearances where there is no corresponding in scrape_appearances
query = "select id,search_term,date,device,domain from competitor_appearances where id not in (select id from scrape_appearances)"
result = get_spark().sql(query)
result.show(truncate=False)



+-----------------------------------------------+------------------------------+----------+-------+---------+
|id                                             |search_term                   |date      |device |domain   |
+-----------------------------------------------+------------------------------+----------+-------+---------+
|no entry in scrape appearancesdesktop2022-05-13|no entry in scrape appearances|2022-05-13|desktop|click.com|
+-----------------------------------------------+------------------------------+----------+-------+---------+



                                                                                

In [42]:
# Count outlier sponsored_appearances records, where sponsored_appearances > scrape_count for corresponding records
query = "select count(*) as outlier_sponsored_appearances from (select ca.id, sponsored_appearances, scrape_count from competitor_appearances ca inner join scrape_appearances sa on ca.id = sa.id) as jn where jn.sponsored_appearances > jn.scrape_count"
result = get_spark().sql(query)
result.show()



+-----------------------------+
|outlier_sponsored_appearances|
+-----------------------------+
|                       306589|
+-----------------------------+



                                                                                

In [43]:
# Some records where sponsored_appearances > scrape_count for corresponding records
query = "select * from (select ca.id, sponsored_appearances, scrape_count from competitor_appearances ca inner join scrape_appearances sa on ca.id = sa.id) as jn where jn.sponsored_appearances > jn.scrape_count"
result = get_spark().sql(query)
result.show()

[Stage 135:>                                                        (0 + 1) / 1]

+--------------------+---------------------+------------+
|                  id|sponsored_appearances|scrape_count|
+--------------------+---------------------+------------+
|0 balance credit ...|                   11|           8|
|0 balance credit ...|                    9|           8|
|0 balance credit ...|                   11|           8|
|0 balance credit ...|                   11|           8|
|0 balance transfe...|                   16|           8|
|0 balance transfe...|                   11|           8|
|0 balance transfe...|                   15|           8|
|0 transfer credit...|                   12|           8|
|0 transfer credit...|                   18|           8|
|0 transfer credit...|                   17|           8|
|0 transfer credit...|                   14|           8|
|0 transfer credit...|                   19|           8|
|0 transfer credit...|                   13|           8|
|0 transfer credit...|                   10|           8|
|1 foot extens

                                                                                