In [0]:
sc

## **Connecting to Data Lake and accessing the file**

In [0]:
pip install azure-storage-file-datalake

[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


In [0]:
import json
from azure.storage.filedatalake import DataLakeServiceClient, FileSystemClient, DataLakeFileClient
 
CONNECTION_STRING = dbutils.secrets.get(scope="license_key", key="connection-string")
 
data_lake_service_client = DataLakeServiceClient.from_connection_string(CONNECTION_STRING)
 
file_system_name = "swiggy-data"
 
 
def download_and_parse_json(file_system_name, file_path):
    file_client = data_lake_service_client.get_file_system_client(file_system_name).get_file_client(file_path)
    try:
        file_data = file_client.download_file().readall()
       
        json_data = json.loads(file_data.decode('utf-8'))
       
        print("JSON data successfully retrieved and parsed")
        return json_data
    except Exception as e:
        print(f"Failed to download and parse JSON: {e}")
        return None
 
if __name__ == "__main__":
 
    data = download_and_parse_json("swiggy-data", "swiggy-raw/swiggy.json")
    if data:
        print("fetch completed")
   

JSON data successfully retrieved and parsed
fetch completed


## **Creating Restaurant and Menu DF**

In [0]:
records_restaurant = []
records_menu = []
 
for city_name, city_data in data.items():
    if isinstance(city_data, dict) and 'restaurants' in city_data:
        full_city_name = city_name
       
        for restaurant_id, restaurant_data in city_data.get('restaurants', {}).items():
            records_restaurant.append({
                'Restaurant_ID': restaurant_id,
                'Restaurant_Name': restaurant_data.get('name'),
                'City': full_city_name,
                'Rating': restaurant_data.get('rating'),
                'Rating_Count': restaurant_data.get('rating_count'),
                'Cost': restaurant_data.get('cost'),
                'Cuisine': restaurant_data.get('cuisine'),
                'Lic_No': restaurant_data.get('lic_no'),
                'Link': restaurant_data.get('link'),
                'Address': restaurant_data.get('address'),
            })
 
            for category, items in restaurant_data.get('menu', {}).items():
                for item_name, item_data in items.items():
                    records_menu.append({
                        'Restaurant_ID': restaurant_id,
                        'Category': category,
                        'Item_Name': item_name,
                        'Price': item_data.get('price'),
                        'Veg_or_Non_Veg': item_data.get('veg_or_non_veg'),
                        'id': f"{restaurant_id}_{item_name}"
                    })
   
    elif isinstance(city_data, dict):
        for sub_area_name, sub_area_data in city_data.items():
            full_city_name = f"{sub_area_name},{city_name}"
 
            for restaurant_id, restaurant_data in sub_area_data.get('restaurants', {}).items():
                records_restaurant.append({
                    'Restaurant_ID': restaurant_id,
                    'Restaurant_Name': restaurant_data.get('name'),
                    'City': full_city_name,
                    'Rating': restaurant_data.get('rating'),
                    'Rating_Count': restaurant_data.get('rating_count'),
                    'Cost': restaurant_data.get('cost'),
                    'Cuisine': restaurant_data.get('cuisine'),
                    'Lic_No': restaurant_data.get('lic_no'),
                    'Link': restaurant_data.get('link'),
                    'Address': restaurant_data.get('address'),
                })
 
                for category, items in restaurant_data.get('menu', {}).items():
                    for item_name, item_data in items.items():
                        records_menu.append({
                            'Restaurant_ID': restaurant_id,
                            'Category': category,
                            'Item_Name': item_name,
                            'Price': item_data.get('price'),
                            'Veg_or_Non_Veg': item_data.get('veg_or_non_veg'),
                            'id': f"{restaurant_id}_{item_name}"
                        })
 
df_restaurant = spark.createDataFrame(records_restaurant)
df_menu = spark.createDataFrame(records_menu)
 
# df_restaurant.show(truncate=False)
# df_menu.show(truncate=False)

In [0]:
df_restaurant.printSchema()
df_menu.printSchema()

root
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Cost: string (nullable = true)
 |-- Cuisine: string (nullable = true)
 |-- Lic_No: string (nullable = true)
 |-- Link: string (nullable = true)
 |-- Rating: string (nullable = true)
 |-- Rating_Count: string (nullable = true)
 |-- Restaurant_ID: string (nullable = true)
 |-- Restaurant_Name: string (nullable = true)

root
 |-- Category: string (nullable = true)
 |-- Item_Name: string (nullable = true)
 |-- Price: string (nullable = true)
 |-- Restaurant_ID: string (nullable = true)
 |-- Veg_or_Non_Veg: string (nullable = true)
 |-- id: string (nullable = true)



In [0]:
# df_restaurant.display()
# df_menu.display()

In [0]:
# df_restaurant.count()

In [0]:
# df_menu.count()

## **Handling Duplicates**

In [0]:
from pyspark.sql.functions import *

# df_restaurant.groupBy(col("Restaurant_ID")).agg(count("*").alias("Count")).filter(col("Count")>1).orderBy(["Count"], ascending = False).show()

In [0]:
duplicate_restaurants = df_restaurant.groupBy("Restaurant_ID", "Restaurant_Name", "Address").count().filter(col("count") > 1)
 
df_restaurant = df_restaurant.drop_duplicates(["Restaurant_ID", "Restaurant_Name", "Address"])
 

In [0]:
# duplicate_restaurants.count()

In [0]:
# df_restaurant.count()

In [0]:
# df_restaurant.groupBy("Restaurant_ID", "Restaurant_Name", "Address").count().filter(col("count") > 1).count()

### **Removing Symbols and NA from cost**

In [0]:
df_restaurant = df_restaurant.withColumn('Cost', when(col('Cost') == 'NA', None).otherwise(regexp_replace(col('Cost'), '[^0-9]','')))

df_mean = df_restaurant.groupBy('City').agg(round(mean(col('Cost')),2).alias('Mean_Cost'))

df_mean_restaurant = df_restaurant.join(df_mean, on = 'City', how = 'left')

df_restaurant = df_mean_restaurant.withColumn(
    'Cost',
    round(coalesce(col('Cost'), col('Mean_Cost')), 2)
).drop('Mean_Cost')

## **Handling Null Values**

In [0]:
# df_restaurant.select([count(when(isnull(column),column)).alias(column) for column in df_restaurant.columns]).show()

In [0]:
df_restaurant = df_restaurant.dropna()

# df_restaurant.select([count(when(isnull(column),column)).alias(column) for column in df_restaurant.columns]).show()

In [0]:
# df_restaurant.count()

## **Handling NA Values**

In [0]:
# df_restaurant.select([count(when(col(column) == 'NA', 1)).alias(column) for column in df_restaurant.columns]).show()

In [0]:
ids_df = df_restaurant.filter(col("Restaurant_Name") == 'NA').select(col("Restaurant_ID")).collect()

In [0]:
NA_ids = [row.Restaurant_ID for row in ids_df]

df_restaurant = df_restaurant.filter(~col("Restaurant_ID").isin(NA_ids))

In [0]:
# df_restaurant.select([count(when(col(column) == 'NA', 1)).alias(column) for column in df_restaurant.columns]).show()

### **Handling Rating_count & Rating**

In [0]:
# df_restaurant.select(col("Rating_Count")).distinct().display()

In [0]:
df_restaurant = df_restaurant.withColumn(
    'Rating_count',
    when(
        col('Rating_count').rlike('^\\d+\\+ ratings$'),
        regexp_replace(col('Rating_count'), '\\+ ratings', '').cast('int')
    )
    .when(
        col('Rating_count').rlike('^\\d+K\\+ ratings$'),
        (regexp_replace(col('Rating_count'), 'K\\+ ratings', '') * 1000).cast('int')
    )
    .when(
        col('Rating_count') == 'NA',
        0
    )
    .when(
        col('Rating_count') == 'Too Few Ratings',
        0
    )
    .otherwise(None)
)

In [0]:
# df_restaurant.select(col("Rating_Count")).distinct().display()

In [0]:
# df_restaurant.select([count(when(isnull(column),column)).alias(column) for column in df_restaurant.columns]).show()

In [0]:
# df_restaurant.select([count(when(col(column) == '--', 1)).alias(column) for column in df_restaurant.columns]).show()

In [0]:
df_restaurant = df_restaurant.withColumn('Rating', when(col("Rating") == '--', 0).otherwise(col("Rating")))

In [0]:
# df_restaurant.select([count(when(col(column) == '--', 1)).alias(column) for column in df_restaurant.columns]).show()

### **Removing Unwanted Columns**

In [0]:
df_restaurant.printSchema()

root
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Cost: string (nullable = true)
 |-- Cuisine: string (nullable = true)
 |-- Lic_No: string (nullable = true)
 |-- Link: string (nullable = true)
 |-- Rating: string (nullable = true)
 |-- Rating_count: integer (nullable = true)
 |-- Restaurant_ID: string (nullable = true)
 |-- Restaurant_Name: string (nullable = true)



In [0]:
df_restaurant = df_restaurant.drop("Link")

### **Changing the Schema of df_restaurant**

In [0]:
df_restaurant.printSchema()

root
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Cost: string (nullable = true)
 |-- Cuisine: string (nullable = true)
 |-- Lic_No: string (nullable = true)
 |-- Rating: string (nullable = true)
 |-- Rating_count: integer (nullable = true)
 |-- Restaurant_ID: string (nullable = true)
 |-- Restaurant_Name: string (nullable = true)



In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, DoubleType
 
new_schema = StructType([
    StructField("Address", StringType(), True),
    StructField("City", StringType(), True),
    StructField("Cost", IntegerType(), True),              
    StructField("Cuisine", StringType(), True),
    StructField("Lic_No", StringType(), True),
    StructField("Rating", DoubleType(), True),            
    StructField("Rating_count", IntegerType(), True),    
    StructField("Restaurant_ID", IntegerType(), True),    
    StructField("Restaurant_Name", StringType(), True)
])
 
df_restaurant = df_restaurant.select(
    [col(field.name).cast(field.dataType) for field in new_schema.fields]
)

In [0]:
# df_restaurant.count()

In [0]:
df_restaurant.columns

['Address',
 'City',
 'Cost',
 'Cuisine',
 'Lic_No',
 'Rating',
 'Rating_count',
 'Restaurant_ID',
 'Restaurant_Name']

In [0]:
pip install pycryptodome

Collecting pycryptodome
  Obtaining dependency information for pycryptodome from https://files.pythonhosted.org/packages/ea/66/6f2b7ddb457b19f73b82053ecc83ba768680609d56dd457dbc7e902c41aa/pycryptodome-3.21.0-cp36-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata
  Downloading pycryptodome-3.21.0-cp36-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (3.4 kB)
Downloading pycryptodome-3.21.0-cp36-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (2.3 MB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/2.3 MB[0m [31m?[0m eta [36m-:--:--[0m
[2K   [91m━━[0m[91m╸[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.2/2.3 MB[0m [31m4.4 MB/s[0m eta [36m0:00:01[0m
[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m2.3/2.3 MB[0m [31m36.4 MB/s[0m eta [36m0:00:01[0m
[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m2.3/2.3 MB[0m [31m36.4 MB/s[0m eta [36m0:00:01[0m
[2K   [90m━━━━━━━━━━

In [0]:
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType
from Crypto.Cipher import AES
import base64
import os

def encrypt_lic_no(lic_no, key):
    if lic_no is None:
        return None
    try:
        # Ensure the key is exactly 16 bytes (128-bit key)
        key = key.ljust(16)[:16].encode('utf-8')
        cipher = AES.new(key, AES.MODE_ECB)  
        
        # Padding to make the input length a multiple of 16
        padded_lic_no = lic_no.ljust(16 * ((len(lic_no) + 15) // 16))
        encrypted_bytes = cipher.encrypt(padded_lic_no.encode('utf-8'))
        
        # Encode the encrypted bytes to Base64
        encrypted_base64 = base64.b64encode(encrypted_bytes).decode('utf-8')
        return encrypted_base64
    except Exception as e:
        return None

def decrypt_lic_no(encrypted_base64, key):
    if encrypted_base64 is None:
        return None
    try:
        # Ensure the key is exactly 16 bytes (128-bit key)
        key = key.ljust(16)[:16].encode('utf-8')
        
        # Decode the Base64-encoded encrypted text
        encrypted_bytes = base64.b64decode(encrypted_base64.encode('utf-8'))
        
        # Create the AES cipher object in ECB mode for decryption
        cipher = AES.new(key, AES.MODE_ECB)
        
        # Decrypt the data
        decrypted_bytes = cipher.decrypt(encrypted_bytes)
        
        # Remove padding (the padding was added during encryption to make the length a multiple of 16)
        decrypted_text = decrypted_bytes.decode('utf-8').rstrip(' ')  # Remove padding
        
        return decrypted_text
    except Exception as e:
        return None

# Define the encryption key
encryption_key = dbutils.secrets.get(scope="license_key", key="license-encryption-key")

# Create UDF for encryption
encrypt_lic_no_udf = udf(lambda lic_no: encrypt_lic_no(lic_no, encryption_key), StringType())

# Encrypt the lic_no column
df_restaurant = df_restaurant.withColumn("Lic_No", encrypt_lic_no_udf(col("Lic_No")))

# Create UDF for decryption
decrypt_lic_no_udf = udf(lambda encrypted_text: decrypt_lic_no(encrypted_text, encryption_key), StringType())

# Decrypt the lic_no_encrypted column
# df1 = df.withColumn("Lic_No", decrypt_lic_no_udf(col("Lic_No")))


In [0]:
# df.display()

In [0]:
# df1.display()

In [0]:
# df_restaurant.display()

In [0]:
# datalake_key = dbutils.secrets.get(scope="license_key", key="datalake-key")

# spark.conf.set("fs.azure.account.key.casestudysk.dfs.core.windows.net", datalake_key)

# delta_table_path_restaurant = "abfss://swiggy-silver@casestudysk.dfs.core.windows.net/swiggy-restaurant/"

# df_restaurant.write.format("delta").mode("overwrite").save(delta_table_path_restaurant)

## **Menu**

In [0]:
# df_menu.count()

### **Handling Duplicates for df_menu**

In [0]:
# df_menu.groupBy("id", "Category").count().filter(col("count") > 1).orderBy(["count"], ascending = False).display()

In [0]:
# df_menu.groupBy("Restaurant_ID", "Category").agg(count("*").alias("Count")).filter(col("Count")>1).orderBy(["Count", "Restaurant_ID"], ascending = False).show()

In [0]:
df_menu = df_menu.drop_duplicates(["id", "Category"])

### **Checking for null values**

In [0]:
# df_menu.select([count(when(isnull(column),column)).alias(column) for column in df_menu.columns]).show()

In [0]:
df_menu = df_menu.dropna()

### **Checking for NA values**

In [0]:
# df_menu.select([count(when(col(column) == 'NA', 1)).alias(column) for column in df_menu.columns]).show()

In [0]:
df_menu = df_menu.filter((col('Item_Name') != 'NA') | (col('Restaurant_ID') != 'NA'))

### **Changing schema of df_menu**

In [0]:
df_menu.printSchema()

root
 |-- Category: string (nullable = true)
 |-- Item_Name: string (nullable = true)
 |-- Price: string (nullable = true)
 |-- Restaurant_ID: string (nullable = true)
 |-- Veg_or_Non_Veg: string (nullable = true)
 |-- id: string (nullable = true)



In [0]:
new_schema = StructType([
    StructField("Category", StringType(), True),
    StructField("Item_Name", StringType(), True),
    StructField("Price", DoubleType(), True),              
    StructField("Restaurant_ID", IntegerType(), True),
    StructField("Veg_or_Non_Veg", StringType(), True),
    StructField("id", StringType(), True)
])

df_menu = df_menu.select(
    [col(field.name).cast(field.dataType).alias(field.name) for field in new_schema.fields]
)

In [0]:
# df_menu.count()

In [0]:
df_menu.columns

['Category', 'Item_Name', 'Price', 'Restaurant_ID', 'Veg_or_Non_Veg', 'id']

In [0]:
# df_menu.display()

## **Creating array of menu grouped by Restaurant_ID**

In [0]:
df_menu_final = df_menu.groupBy('Restaurant_ID').agg(
    collect_list(
        struct('Category', 'Item_Name', 'Price', 'Restaurant_ID', 'Veg_or_Non_Veg', 'id')
    ).alias('Menu_Items')
)

In [0]:
# df_menu_final.display()

In [0]:
# datalake_key = dbutils.secrets.get(scope="license_key", key="datalake-key")

# spark.conf.set("fs.azure.account.key.casestudysk.dfs.core.windows.net", datalake_key)

# delta_table_path_menu = "abfss://swiggy-silver@casestudysk.dfs.core.windows.net/swiggy-menu/"

# df_menu_final.write.format("delta").mode("overwrite").save(delta_table_path_menu)

## **Joining df_restaurant and df_menu_final for final df**

In [0]:
df_final = df_restaurant.join(df_menu_final, on="Restaurant_ID", how="left")

### **Adding Timestamp Column to df_final**

In [0]:
df_final = df_final.withColumn("update_timestamp", current_timestamp())

In [0]:
# df_final.count()

In [0]:
df_final.columns

['Restaurant_ID',
 'Address',
 'City',
 'Cost',
 'Cuisine',
 'Lic_No',
 'Rating',
 'Rating_count',
 'Restaurant_Name',
 'Menu_Items',
 'update_timestamp']

In [0]:
df_final.printSchema()

root
 |-- Restaurant_ID: integer (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Cost: integer (nullable = true)
 |-- Cuisine: string (nullable = true)
 |-- Lic_No: string (nullable = true)
 |-- Rating: double (nullable = true)
 |-- Rating_count: integer (nullable = true)
 |-- Restaurant_Name: string (nullable = true)
 |-- Menu_Items: array (nullable = true)
 |    |-- element: struct (containsNull = false)
 |    |    |-- Category: string (nullable = true)
 |    |    |-- Item_Name: string (nullable = true)
 |    |    |-- Price: double (nullable = true)
 |    |    |-- Restaurant_ID: integer (nullable = true)
 |    |    |-- Veg_or_Non_Veg: string (nullable = true)
 |    |    |-- id: string (nullable = true)
 |-- update_timestamp: timestamp (nullable = false)



In [0]:
# df_final.display()

In [0]:
datalake_key = dbutils.secrets.get(scope="license_key", key="datalake-key")

spark.conf.set("fs.azure.account.key.casestudysk.dfs.core.windows.net", datalake_key)

delta_table_path_final = "abfss://swiggy-silver@casestudysk.dfs.core.windows.net/swiggy-final/"

df_final.write.format("delta").mode("append").save(delta_table_path_final)