In [65]:
from pyspark.sql import SparkSession
import json

In [66]:
spark = SparkSession.builder.appName("learning-dataframe").getOrCreate()

In [67]:
spark

In [68]:
# cols, file_names as variable
# input json config file, location for csv files, url's --> output json with results 


from pyspark.sql.functions import col,isnan,when,count

def input_csv_file(file_name:str):
    df = spark.read.option('header',"true").csv(file_name, inferSchema = True)
    return df


def input_txt_file(file:str):
    input_list = spark.read.text(file).collect()
    #file2 = spark.read.text(detail_file).collect()
    
    return input_list


def missing_attribute_test(df:str, columns:list):
    
    #columns = ["Product_Code","Product_Name", "Search_Term", "rank","shallow_url","DetailURL"]
    
    return {col:("Missing" if df.filter(df[col].isNull()).count()>0 else "No Missing") for col in columns}


# def missing_attribute_test_detail(df:str, columns:list):
    
#     #columns = ["URL","Product_Name", "Sku", "Availability","Price","Rating","Review","ImageURL","Old_price"]
    
#     return {col:df.filter(df[col].isNull()).count() for col in columns}

    

def non_empty_results_for_each_search_term_test(df:str):
    
    result = dict()
    term_list = df.select("Search_Term").distinct().collect()
    for col in term_list:
        col = col["Search_Term"]
        if int(df.where(df["Search_Term"] == col).count()) == 0:
            result[col] = "Empty"
        else:
            result[col] = "Non-Empty"

    return result


def search_terms_not_in_csv_test(df:str, search_list:list):
# file contains the search terms
# need to match the search terms in the file with the search terms in the csv
    result = dict()
    for search_term in search_list:
        search_term_from_list = search_term["value"]
        
        # find this search term in csv
# df_filtered.where(df_filtered["Search_Term"] == term)

        if df.where(df["Search_Term"] == search_term_from_list):
            result[search_term_from_list] = "Present"
        else:
            result[search_term_from_list] = "Absent"
    
    return result

# are all the products listed included in the scrape?
def products_not_listed_in_scrape_test(df:str, url_list:list):
    
    result = dict()
    for url in url_list:
        url_from_list = url["value"]
        
        if df.where(df["URL"] == url_from_list):
            result[url_from_list] = "Present"
        else:
            result[url_from_list] = "Absent"
            
    return result

def rank_ordering_test(df:str):
    
    result = dict()
    term_list = df.select("Search_Term").distinct().collect()
    for term in term_list:
        term = term["Search_Term"]
        infered_sum = int(df.where(df["Search_Term"] == term).select("rank").groupBy().sum().collect()[0][0])
        n = int(df.where(df["Search_Term"] == term).count())
        GT_sum = (n)*(n+1)/2
        
        if infered_sum == int(GT_sum):
            result[term] = "ordered" 
        else:
            result[term] = "un-ordered" 
    
    return result


In [69]:
# input_json = """{
#   "search_csv": "bodylab_search.csv",
#   "detail_csv": "bodylab_detail.csv",
#   "search_txt": "bodylab24_search.txt",
#   "detail_txt": "bodylab24_detail.txt",
#   "tests":{
#     "missing_attribute_test": {
#       "status": 1,
#       "on_file": "search csv",
#       "columns": ["Product_Code","Product_Name", "Search_Term", "rank","shallow_url","DetailURL"]
#     },
#     "non_empty_results_for_each_search_term_test":{
#       "status": 1,
#       "on_file": "search csv"
#     },
#     "search_terms_not_in_csv_test":{
#       "status": 1,
#       "on_file": "search csv"
#     },
#     "rank_ordering_test":{
#       "status": 1,
#       "on_file": "search csv"
#     }
#   }
# }"""

# with open("input.json") as f:
#     json_data = json.load(f)
# print(json_data)
# search_csv = json_data["search_csv"]
# detail_csv = json_data["detail_csv"]
# search_txt_file = json_data["search_txt"]
# detail_txt_file = json_data["detail_txt"]

# for test in json_data["tests"]:
#     if test == "missing_attribute_test"

def json_extract(obj, key):
    """Recursively fetch values from nested JSON."""
    arr = []

    def extract(obj, arr, key):
        """Recursively search for values of key in JSON tree."""
        if isinstance(obj, dict):
            for k, v in obj.items():
                if isinstance(v, (dict, list)):
                    extract(v, arr, key)
                elif k == key:
                    arr.append(v)
        elif isinstance(obj, list):
            for item in obj:
                extract(item, arr, key)
        return arr

    values = extract(obj, arr, key)
    return values



In [70]:
#TODO:1) change input json singature to include csv_type before the type of test - Done
# 2) organize the driver_fn to run tests with minimum if/else statements - Done
# 3) present output json is verbose, add a succint version of result (ex: which columns are missing etc.)
# 4) break the concerns into smaller functions (example: make output_json function to output json)
# 5) inside main_Driver fn incldue : detail driver fn and search driver fn - Done
# 6) replace if/else with dictionary: dict.get(x, default) -  Done
# 7) use try, except, else, finally for handling file and json exceptions
# 8) export jupter ntbk script to vscode and push
# 9) run for detail csv
# 10) separate concerns in run_fns 

run_logic = {
    "missing_attribute_test":missing_attribute_test,
    "non_empty_results_for_each_search_term_test":non_empty_results_for_each_search_term_test,
    "search_terms_not_in_csv_test":search_terms_not_in_csv_test,
    "rank_ordering_test":rank_ordering_test,
    "products_not_listed_in_scrape_test":products_not_listed_in_scrape_test,
    "products_not_listed_in_scrape_test":products_not_listed_in_scrape_test
}



def run_fns(file):
    
    run_id = json_extract(file, "run_id")[0]
    search_csv = json_extract(file, "search_csv")[0]
    detail_csv = json_extract(file, "detail_csv")[0]
    search_txt = json_extract(file, "search_txt")[0]
    detail_txt = json_extract(file, "detail_txt")[0]
    
#     if json_data["tests"]["missing_attribute_test"]["on_file"] == "search_csv":
#         search_columns = json_data["tests"]["missing_attribute_test"]["columns"]
        
#     elif json_data["tests"]["missing_attribute_test"]["on_file"] == "detail_csv":
#         detail_columns = json_data["tests"]["missing_attribute_test"]["columns"]
        
    for search_test in file["tests"]["search_test"]:
        
        search_df = input_csv_file(search_csv)
        search_list = input_txt_file(search_txt)
        
        action = run_logic.get(search_test)
        print(action)
        
        if action == missing_attribute_test and file["tests"]["search_test"]["missing_attribute_test"]["status"] == 1:
            cols = file["tests"]["search_test"]["missing_attribute_test"]["columns"]
            search_missing_attribute = action(df = search_df, columns = cols)
        
        if action == non_empty_results_for_each_search_term_test and file["tests"]["search_test"]["non_empty_results_for_each_search_term_test"]["status"] == 1:
            search_non_empty_attributes = action(df = search_df)
            
        if action == search_terms_not_in_csv_test and file["tests"]["search_test"]["search_terms_not_in_csv_test"]["status"] == 1:
            search_terms_not_in_csv = action(df = search_df, search_list = search_list)
    
        if action == rank_ordering_test and file["tests"]["search_test"]["rank_ordering_test"]["status"] == 1:
            search_rank_order = action(df = search_df)
    
    for detail_test in file["tests"]["detail_test"]:
        
        detail_df = input_csv_file(detail_csv)
        url_list = input_txt_file(detail_txt)
        
        action = run_logic.get(detail_test)
        print(action)
        
        if action == missing_attribute_test and file["tests"]["detail_test"]["missing_attribute_test"]["status"] == 1:
            cols_2 = file["tests"]["detail_test"]["missing_attribute_test"]["columns"]
            detail_missing_attribute = action(df = detail_df, columns = cols_2)
        
        if action == products_not_listed_in_scrape_test and file["tests"]["detail_test"]["products_not_listed_in_scrape_test"]["status"] == 1:
            detail_products_not_listed_in_scrape = action(df = detail_df, url_list = url_list)
            

#     if json_extract(json_data, "detail_txt")[0] == "search_csv":
#     search_df = input_csv_file(file_name = search_csv)
#     search_list = input_txt_file(file = search_txt)

#     search_missing_attribute = missing_attribute_test(search_df,columns = ["Product_Code","Product_Name", "Search_Term", "rank","shallow_url","DetailURL"])

#     search_non_empty_attributes = non_empty_results_for_each_search_term_test(search_df)

#     search_terms_not_in_csv = search_terms_not_in_csv_test(search_df, search_list)

#     search_rank_order = rank_ordering_test(search_df)

    output = {"run_id": f"{run_id}",
        "search_csv": f"{search_csv}",
        "detail_csv": f"{detail_csv}",
        "search_txt": f"{search_txt}",
        "detail_txt": f"{detail_txt}",
        "search_missing_attributes": search_missing_attribute,
        "search_non-empty_attributes":search_non_empty_attributes,
        "search_terms_not_in_csv":search_terms_not_in_csv,
        "search_rank_order":search_rank_order,
        "detail_missing_attribute": detail_missing_attribute,
        "detail_products_not_listed_in_scrape": detail_products_not_listed_in_scrape
    }
    
    return output
# df = input_csv_file(file_name = "bodylab_detail.csv")
#     search_list = input_txt_file(file = "bodylab24_search.txt")
#     url_list = input_txt_file(file = "bodylab24_detail.txt")
#     a=missing_attribute_test(df,columns = ["Product_Code","Product_Name", "Search_Term", "rank","shallow_url","DetailURL"])
#     b=non_empty_results_for_each_search_term_test(df)
#     c = search_terms_not_in_csv_test(df, search_list)
#     d = rank_ordering_test(df)

In [71]:
with open("input2.json") as f:
    json_data = json.load(f)
  
output = []
def run_driver_fn(json_data):
    for file in json_data["runs"]:
        output.append(run_fns(file))
        
run_driver_fn(json_data)

run_index = json_data["run_index"]

output = """
run_index = {0},
runs:{1}
""".format(run_index, output)

<function missing_attribute_test at 0x7f18d7c014c0>
<function non_empty_results_for_each_search_term_test at 0x7f18d7c01940>
<function search_terms_not_in_csv_test at 0x7f18d7c01700>
<function rank_ordering_test at 0x7f18d7c01dc0>
<function missing_attribute_test at 0x7f18d7c014c0>
<function products_not_listed_in_scrape_test at 0x7f18d7c01af0>
<function missing_attribute_test at 0x7f18d7c014c0>
<function non_empty_results_for_each_search_term_test at 0x7f18d7c01940>
<function search_terms_not_in_csv_test at 0x7f18d7c01700>
<function rank_ordering_test at 0x7f18d7c01dc0>
<function missing_attribute_test at 0x7f18d7c014c0>
<function products_not_listed_in_scrape_test at 0x7f18d7c01af0>


In [73]:
output = """run_index = {0},
runs:{1}
""".format(run_index, output)

with open("output.json", "w") as outfile:
    json.dump(output, outfile, indent = 4)

In [74]:

output

"run_index = 1,\nruns:run_index = 1,\nruns:\nrun_index = 1,\nruns:[{'run_id': 'msje', 'search_csv': 'bodylab_search.csv', 'detail_csv': 'bodylab_detail.csv', 'search_txt': 'bodylab24_search.txt', 'detail_txt': 'bodylab24_detail.txt', 'search_missing_attributes': {'Product_Code': 'No Missing', 'Product_Name': 'No Missing', 'Search_Term': 'No Missing', 'rank': 'No Missing', 'shallow_url': 'No Missing', 'DetailURL': 'No Missing'}, 'search_non-empty_attributes': {'Riegel mit hohem Proteingehalt': 'Non-Empty', 'Gewichtsgewinner': 'Non-Empty', 'Kreatin': 'Non-Empty', 'Riegel+mit+hohem+Proteingehalt': 'Non-Empty', 'Massengewinner': 'Non-Empty', 'Protein+Pulver': 'Non-Empty', 'Gewinner': 'Non-Empty', 'Protein-Bodybuilding': 'Non-Empty', 'isoliertes+Eiweiß': 'Non-Empty', 'Proteinriegel': 'Non-Empty', 'vor dem Training': 'Non-Empty', 'Aminos&auml;uren': 'Non-Empty', 'Aminosäuren': 'Non-Empty', 'Aminos': 'Non-Empty', 'Riegel': 'Non-Empty', 'Eiwei': 'Non-Empty', 'Bände': 'Non-Empty', 'zus&auml;tzl