In [0]:
%run "/Workspace/Users/varundwivedivaranasi@gmail.com/dataquality/Shared/validation_utils/data_quality_validator"

In [0]:
%run "/Workspace/Users/varundwivedivaranasi@gmail.com/dataquality/Shared/validation_utils/data_masker"

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from typing import List, Dict
import json
import os
import datetime


In [0]:
volume = "/Volumes/workspace/myschema/myvol"
input_path = f"{volume}/source/resturant_json_data.json"
reject_path = f"{volume}/reject"
rules_path = f"{volume}/validation_rules/quality_rules.json"
masking_field = f"{volume}/masking_fields/masking_fields.json"

In [0]:
df_source = spark.read.option("multiline", "true").json(input_path)

In [0]:
#read validation rule file 
with open(rules_path, "r") as f:
    quality_rules = json.load(f)

In [0]:
#read masking field file 
with open(masking_field, "r") as f:
    masking_list = json.load(f)
fields_to_mask = masking_list.get("fields_to_mask", [])

In [0]:
df = df_source.withColumn("restaurants",explode("restaurants"))\
    .withColumn("id",col("restaurants.restaurant.id"))\
        .withColumn("restaurant name",col("restaurants.restaurant.name"))\
            .withColumn("cuisines",col("restaurants.restaurant.cuisines"))\
                .withColumn("ratings",col("restaurants.restaurant.user_rating.rating_text"))\
                    .withColumn("city",col("restaurants.restaurant.location.city"))\
                        .withColumn("establishment_types",explode_outer(col("restaurants.restaurant.establishment_types")))\
                            .withColumn("deeplink",col("restaurants.restaurant.deeplink"))\
                                .withColumn("rating",when(col("ratings")=="Excellent",lit(4)).when(col("ratings")=="Very Good",lit(3))\
                                    .when(col("ratings")=="Good",lit(2)).when(col("ratings")=="Average",lit(1)).otherwise(lit(-1)))\
                                        .withColumn("rating",col("rating").cast("int"))\
                                            .withColumn("card_number",col("restaurants.restaurant.apikey"))\
                                                .drop("code","message","results_found","results_shown","results_start","status","restaurants")

In [0]:
df.display()

In [0]:
from functools import reduce
validator = DataQualityValidator(df, notebook_name="Restaurant_Metadata_Validation")
validated_df = validator.run_checks(quality_rules)
validator.log_errors_to_blob(reject_path)

In [0]:
masker = DataMasker(fields_to_mask)
masked_df = masker.mask(validated_df)
masked_df.display()

In [0]:
%sql
select * from workspace.myschema.validation_summary;