# processor.py

In [1]:
from asyncio.log import logger
import os, ast
from config import Config
from datetime import datetime, timezone
import logging
import boto3
from pyspark.sql import SparkSession
from path import *
from sub_processor1802 import SubProcessor
from iceberg_catalog import IcebergCatalog
from broadcast import BroadCast
from udf import load_broadcast
import asyncio

import gc

# Config
AWS_PROFILE = Config.AWS_PROFILE

FILE_NAME = f"processor_execution_logs_{CURRENT_DATE}.log"
LOG_DIR = os.path.join("logs", CURRENT_DATE)
os.makedirs(LOG_DIR, exist_ok=True)
LOG_FILE = os.path.join(LOG_DIR, FILE_NAME)

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s",
    handlers=[logging.FileHandler(LOG_FILE), logging.StreamHandler()],
)

class Processor:
    def __init__(self):

        self.broadcast = BroadCast()
        self.sub_processor = SubProcessor(self.broadcast)
        self.iceberg_catalog = IcebergCatalog()


        # Prepare Session
        if DEBUG:
            self.spark_session = (
                SparkSession.builder.appName("Details Data Process")
                .master("local[*]")
                .config("spark.executor.memory", "3g")
                #.config("spark.memory.fraction", "0.7")
                .config("spark.driver.memory", "4g")
                #.config("spark.executor.cores", "2")
                # .config("spark.driver.cores", "1")
                # .config("spark.executor.instances", "1")
                # .config("spark.memory.offHeap.enabled", "true")
                # .config("spark.memory.offHeap.size", "2g")
                .config(
                    "spark.jars.packages",
                    "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.2,"
                )
                .config(
                    "spark.sql.extensions",
                    "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
                )
                .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog")
                .config("spark.sql.catalog.local.type", "hadoop")
                .config("spark.sql.catalog.local.warehouse", CURATED_ZONE)
                # .config("spark.executor.memoryOverhead", "1g")
                # .config("spark.driver.memoryOverhead", "1g")
                # .config("spark.driver.cores", "1")
                .getOrCreate()
            )

        else:
            # Get AWS Profile Credentials
            self.boto3_session = boto3.Session(profile_name=AWS_PROFILE)
            self.credentials = self.boto3_session.get_credentials()
            self.aws_access_key_id = self.credentials.access_key
            self.aws_secret_access_key = self.credentials.secret_key
            self.aws_session_token = self.credentials.token

            self.spark_session = (
                SparkSession.builder.appName("Details Data Process")
                .config(
                    "spark.jars.packages",
                    "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.2,"
                    "org.apache.hadoop:hadoop-aws:2.10.1,"
                    "com.amazonaws:aws-java-sdk-bundle:1.11.1026",
                )
                .config(
                    "spark.sql.extensions",
                    "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
                )
                .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog")
                .config("spark.sql.catalog.local.type", "hadoop")
                .config("spark.sql.catalog.local.warehouse", CURATED_ZONE)
                .config(
                    "spark.sql.catalog.local.hadoop.fs.s3a.access.key",
                    self.aws_access_key_id,
                )
                .config(
                    "spark.sql.catalog.local.hadoop.fs.s3a.secret.key",
                    self.aws_secret_access_key,
                )
                .config(
                    "spark.sql.catalog.local.hadoop.fs.s3a.session.token",
                    self.aws_session_token,
                )
                .config(
                    "spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem"
                )
                .config("spark.hadoop.fs.s3a.endpoint", "s3.us-east-2.amazonaws.com")
                .config(
                    "spark.hadoop.fs.s3a.aws.credentials.provider",
                    "com.amazonaws.auth.DefaultAWSCredentialsProviderChain",
                )
                .config("spark.executor.memory", "7g")
                .config("spark.executor.memoryOverhead", "1g")
                .config("spark.driver.memory", "6g")
                .config("spark.driver.memoryOverhead", "1g")
                .config("spark.executor.cores", "3")
                .config("spark.driver.cores", "1")
                .config("spark.executor.instances", "8")
                .getOrCreate()
            )
    def preprocess_reviews_data(self,
            df_details,
            df_reviews_scores,
            df_chain_and_brand,
            df_search,
            ):
        df_processed = df_details
        logging.info("======== Before Review data process start ========")
        before_df_processed = self.sub_processor.process_review_scores_data(
            df_processed=df_processed, review_score_df=df_reviews_scores
        )
        logging.info("======== Before Review data process ended ========")

        logging.info("======== After Review data process start ========")
        after_df_processed = self.sub_processor.after_process_review_scores_data(
            df_processed=df_processed, review_score_df=df_reviews_scores
        )
        logging.info("======== After Review data process ended ========")

        return before_df_processed,after_df_processed
    def preprocess_property_flags_data(self,
            df_details,
            df_reviews_scores,
            df_chain_and_brand,
            df_search,):
        df_processed = df_details
        logging.info("======== Before Property Flags data process Started ========")
        befor_df_processed = self.sub_processor.process_property_flags_data(df_processed=df_processed)
        logging.info("======== Before Property Flags data process ended ========")

        logging.info("======== After Property Flags data process Started ========")
        after_df_processed = self.sub_processor.after_process_property_flags_data(df_processed=df_processed)
        logging.info("======== After Property Flags data process ended ========")

        return befor_df_processed,after_df_processed
    def preprocess_basic_dataprocess_chain_and_brand_data(self,
            df_details,
            df_reviews_scores,
            df_chain_and_brand,
            df_search,):
        df_processed = df_details
        logging.info("======== Before Chain and Brand data process Started ========")
        before_df_processed = self.sub_processor.process_chain_and_brand_data(
            df_processed=df_processed, df_chain_and_brand=df_chain_and_brand
        )
        logging.info("======== Before Chain and Brand data process Ended ========")
        
        logging.info("======== After Chain and Brand data process Started ========")
        after_df_processed = self.sub_processor.after_process_chain_and_brand_data(
            df_processed=df_processed, df_chain_and_brand=df_chain_and_brand
        )
        logging.info("======== After Chain and Brand data process Ended ========") 
        return before_df_processed,after_df_processed 
    
    def preprocess_commission_and_meal_plan_data(self,
            df_details,
            df_reviews_scores,
            df_chain_and_brand,
            df_search,):
        df_processed = df_details
        logging.info("======== Before Commission and meal plan data process Started ========")
        before_df_processed = self.sub_processor.process_commission_and_meal_plan_data(df_processed=df_processed, df_search=df_search)
        logging.info("======== After Commission and meal plan data process Ended ========")
        
        logging.info("======== After Commission and meal plan data process Started ========")
        after_df_processed = self.sub_processor.after_process_commission_and_meal_plan_data(df_processed=df_processed, df_search=df_search)
        logging.info("======== After Commission and meal plan data process Ended ========")
        return before_df_processed,after_df_processed 
        
    def process_details_data(
            self,
            df_details,
            df_reviews_scores,
            df_chain_and_brand,
            df_search,
    ):
        df_processed = df_details

        # Process Basic Data
        # logging.info("======== Basic data process started ========")
        # df_processed = self.sub_processor.process_basic_data(df_processed=df_processed, spark_session=self.spark_session)
        # logging.info("=======: df_processed : =====")
        # df_processed.show(5)
        logging.info("======== Basic data process ended ========")

        logging.info("======== Basic data process Started ========")
        df_processed = self.sub_processor.process_review_scores_data(
            df_processed=df_processed, review_score_df=df_reviews_scores
        )
        logging.info("======== Review data process ended ========")

        logging.info("======== Property Flags data process Started ========")
        df_processed = self.sub_processor.process_property_flags_data(df_processed=df_processed)
        logging.info("======== Property Flags data process ended ========")

        logging.info("======== Chain and Brand data process Started ========")
        df_processed = self.sub_processor.process_chain_and_brand_data(
            df_processed=df_processed, df_chain_and_brand=df_chain_and_brand
        )
        logging.info("======== Chain and Brand data process Ended ========")

        # df_processed = self.sub_processor.process_property_flags_data(df_processed=df_processed)

        logging.info("======== Commission and meal plan data process Started ========")
        df_processed = self.sub_processor.process_commission_and_meal_plan_data(df_processed=df_processed, df_search=df_search)
        logging.info("======== Commission and meal plan data process Ended ========")

        logging.info("======== USD price and price history data process Started ========")
        df_processed, df_price_history = self.sub_processor.process_usd_price_and_price_history(df_processed=df_processed, df_search=df_search, spark_session=self.spark_session)
        logging.info("======== USD price and price history data process Ended ========")

        return df_processed, df_price_history

    def process_details_localize_data(self, df_processed):
        df_processed = self.sub_processor.process_localize_data(df_processed=df_processed, spark_session=self.spark_session)
        return df_processed
    
    def read_df_details(self):
        # # Load Details Data
        logging.info("Loading Details Data =======")
        df_details = self.spark_session.read.format("json").option("multiline", "true").load(DETAILS_DATA_DIR).limit(50)
        logging.info("Details Data Loaded =======")
        return df_details
    
    def read_df_reviews_scores(self):
        logging.info("Loading Reviews Scores Data =======")
        df_reviews_scores = self.spark_session.read.format("json").option("multiline", "true").load(REVIEW_SCORES_DIR).limit(50)
        logging.info("Reviews Scores Data Loaded =======")
        return df_reviews_scores

                
    def read_df_search(self):
        logging.info("Loading Reviews Scores Data =======")
        df_search = self.spark_session.read.format("json").option("multiline", "true").load(ACCOMMODATION_SEARCH_DIR).limit(50)
        logging.info("Search Data Loaded =======")
        return df_search
    
    def read_df_chain_and_brand(self):
        logging.info("Chain and Brand Data Loaded =======")
        df_chain_and_brand = self.spark_session.read.format("json").option("multiline", "true").load(CHAIN_AND_BRAND).limit(50)
        return df_chain_and_brand
    
    def read_df_reviews(self):
        logging.info("Reviews Data Loaded =======")
        df_reviews = self.spark_session.read.format("json").option("multiline", "true").load(REVIEW_DIR).limit(50)
        return df_reviews
    
    def read_df_processed(self,df_details):
        logging.info("Processed Data Loaded =======")
        df_processed, count_df_null_location_id = self.sub_processor.process_location_data_bg(df_details, self.spark_session)
        return df_processed,count_df_null_location_id
    
    def BroadCast(self):
        # Load Broadcast Data
        logging.info("====== Broadcasting Data =======")
        self.broadcast.prepare_broadcasted_data(spark_session=self.spark_session)
        load_broadcast(self.broadcast)
        logging.info("====== Broadcasted Data =======") 

    def preprocess_basic_data(self,df_processed,df_reviews_scores,df_chain_and_brand,df_search):
        logging.info("====== Details Data process started =======")
        df_processed = self.preprocess_details_data(
            df_details=df_processed,
            df_reviews_scores=df_reviews_scores,
            df_chain_and_brand=df_chain_and_brand,
            df_search=df_search
        )
        return df_processed
    def first_process(self,df_details):
        df_processed = df_details
        logging.info("======== Basic data process started ========")
        first = self.sub_processor.first_process_basic_data(df_processed=df_processed, spark_session=self.spark_session)
        logging.info("=======: df_processed : =====")
        return first
    def second_process(self,df_details):
        df_processed = df_details
        logging.info("======== Basic data process started ========")
        second = self.sub_processor.second_process_basic_data(df_processed=df_processed, spark_session=self.spark_session)
        logging.info("=======: df_processed : =====")
        return second
    def third_process(self,df_details):
        df_processed = df_details
        logging.info("======== Basic data process started ========")
        third = self.sub_processor.third_process_basic_data(df_processed=df_processed, spark_session=self.spark_session)
        logging.info("=======: df_processed : =====")
        return third
    def fourth_process(self,df_details):
        df_processed = df_details
        logging.info("======== Basic data process started ========")
        four = self.sub_processor.fourth_process_basic_data(df_processed=df_processed, spark_session=self.spark_session)
        logging.info("=======: df_processed : =====")
        return four
    def fifth_process(self,df_details):
        df_processed = df_details
        logging.info("======== Basic data process started ========")
        fifth = self.sub_processor.fifth_process_basic_data(df_processed=df_processed, spark_session=self.spark_session)
        logging.info("=======: df_processed : =====")
        return fifth
    def sixth_process(self,df_details):         
        df_processed = df_details
        logging.info("======== Basic data process started ========")
        sixth = self.sub_processor.sixth_process_basic_data(df_processed=df_processed, spark_session=self.spark_session)
        logging.info("=======: df_processed : =====")
        return sixth
    def seventh_process(self,df_details):
        df_processed = df_details
        logging.info("======== Basic data process started ========")
        seventh = self.sub_processor.seventh_process_basic_data(df_processed=df_processed, spark_session=self.spark_session)
        logging.info("=======: df_processed : =====")
        return seventh
    def eighth_process(self,df_details):
        df_processed = df_details
        logging.info("======== Basic data process started ========")
        eighth = self.sub_processor.eighth_process_basic_data(df_processed=df_processed, spark_session=self.spark_session)
        logging.info("=======: df_processed : =====")
        return eighth
    def ninth_process(self,df_details):
        df_processed = df_details
        logging.info("======== Basic data process started ========")
        ninth = self.sub_processor.ninth_process_basic_data(df_processed=df_processed, spark_session=self.spark_session)
        logging.info("=======: df_processed : =====")
        return ninth
    def tenth_process(self,df_details):
        df_processed = df_details
        logging.info("======== Basic data process started ========")
        tenth = self.sub_processor.tenth_process_basic_data(df_processed=df_processed, spark_session=self.spark_session)        
        logging.info("=======: df_processed : =====")
        return tenth
    def eleventh_process(self,df_details):
        df_processed = df_details
        logging.info("======== Basic data process started ========")
        eleventh = self.sub_processor.eleventh_process_basic_data(df_processed=df_processed, spark_session=self.spark_session)
        logging.info("=======: df_processed : =====")
        return eleventh
    def twelveth_process(self,df_details):
        df_processed = df_details
        logging.info("======== Basic data process started ========")
        twelveth = self.sub_processor.twelveth_process_basic_data(df_processed=df_processed, spark_session=self.spark_session)
        logging.info("=======: df_processed : =====")
        return twelveth
    def thirteenth_process(self,df_details):
        df_processed = df_details
        logging.info("======== Basic data process started ========")   
        thirteenth = self.sub_processor.thirteenth_process_basic_data(df_processed=df_processed, spark_session=self.spark_session)
        logging.info("=======: df_processed : =====")
        return thirteenth
    def fourteenth_process(self,df_details):
        df_processed = df_details
        logging.info("======== Basic data process started ========")
        fourteenth = self.sub_processor.fourteenth_process_basic_data(df_processed=df_processed, spark_session=self.spark_session)
        logging.info("=======: df_processed : =====")
        return fourteenth
    def fifteenth_process(self,df_details):
        df_processed = df_details
        logging.info("======== Basic data process started ========")
        fifteenth = self.sub_processor.fifteenth_process_basic_data(df_processed=df_processed, spark_session=self.spark_session)
        logging.info("=======: df_processed : =====")
        return fifteenth

In [2]:
processor = Processor()

25/02/18 23:39:48 WARN Utils: Your hostname, hp resolves to a loopback address: 127.0.1.1; using 192.168.0.107 instead (on interface enp1s0)
25/02/18 23:39:48 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/home/siam/Downloads/inventroy-cron/env/lib/python3.12/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/siam/.ivy2/cache
The jars for the packages stored in: /home/siam/.ivy2/jars
org.apache.iceberg#iceberg-spark-runtime-3.5_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-61a7de88-f68f-4131-b84c-48a886b9751c;1.0
	confs: [default]
	found org.apache.iceberg#iceberg-spark-runtime-3.5_2.12;1.5.2 in central
:: resolution report :: resolve 233ms :: artifacts dl 4ms
	:: modules in use:
	org.apache.iceberg#iceberg-spark-runtime-3.5_2.12;1.5.2 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   1   |   0   |   0   |   0   ||   1   |   0   |
	---------------------------------------------------------------------
:: retrieving :: org.apache.spark#

# read details data

In [3]:
df_details=processor.read_df_details()



In [4]:
df_details.show(5)

                                                                                

+------------------+------+----------------------+--------+--------------------+--------------------+--------------------+--------+----------------+--------------------+--------------------+---------------+--------------------+--------------------+--------------------+--------------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+
|accommodation_type|brands|checkin_checkout_times|currency|       deep_link_url|         description|          facilities|      id|is_work_friendly|            location|                name|number_of_rooms|             payment|              photos|            policies|price_category|programmes|              rating|               rooms|    spoken_languages|              themes|                 url|
+------------------+------+----------------------+--------+--------------------+--------------------+--------------------+--------+----------------+--------------------+--------------------+--------

# read reviews score data

In [5]:
df_reviews_scores=processor.read_df_reviews_scores()



In [6]:
df_reviews_scores.count()

                                                                                

50

# read search data

In [7]:
df_search=processor.read_df_search()



In [8]:
df_search.count()

                                                                                

50

# read chain and brand data

In [9]:
df_chain_and_brand=processor.read_df_chain_and_brand()



In [10]:
df_chain_and_brand.count()

50

# read reviews data

In [11]:
df_reviews=processor.read_df_reviews()



                                                                                

In [12]:
df_reviews.count()

50

# read processed data

In [13]:
df_processed,count_df_null_location_id = processor.read_df_processed(df_details)

                                                                                

In [14]:
df_processed.count()

                                                                                

50

# delete duplicate data

In [15]:
df_details = df_details.dropDuplicates(["id"])
df_reviews_scores = df_reviews_scores.dropDuplicates(["id"])
df_search = df_search.dropDuplicates(["id"])
df_reviews = df_reviews.dropDuplicates(["id"])
logging.info("!!!Data count !!!")
print(df_details.count())
logging.info("====== Duplicates Removed =======")

2025-02-18 23:43:20,207 - INFO - !!!Data count !!!


50


# load broadcast

In [16]:
processor.BroadCast()



# preprocess basic data

In [17]:
df_processed = processor.first_process(df_processed)
df_processed.show(5)

2025-02-18 23:43:22,912 - INFO - >> sub-process : property_name() completed =====
25/02/18 23:43:23 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+------------------+------+----------------------+--------+--------------------+--------------------+--------------------+-----------+----------------+--------------------+--------------------+---------------+--------------------+--------------------+--------------------+--------------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+------------+----+----------------+--------+---------+---------+-----------+-----------+----------+-----+----------+--------------------+
|accommodation_type|brands|checkin_checkout_times|currency|       deep_link_url|         description|          facilities|         id|is_work_friendly|            location|                name|number_of_rooms|             payment|              photos|            policies|price_category|programmes|              rating|               rooms|    spoken_languages|              themes|                 url|country_code|feed|feed_provider_id|owner_id|      lat|  

In [18]:
df_processed = processor.second_process(df_processed)
df_processed.show(5)

2025-02-18 23:43:23,775 - INFO - >> sub-process : process_rating_and_score() completed =====


+------------------+------+----------------------+--------+--------------------+--------------------+--------------------+-----------+----------------+--------------------+--------------------+---------------+--------------------+--------------------+--------------------+--------------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+------------+----+----------------+--------+---------+---------+-----------+-----------+----------+-----+----------+--------------------+----------------+-----------+------------+--------------------+------------+
|accommodation_type|brands|checkin_checkout_times|currency|       deep_link_url|         description|          facilities|         id|is_work_friendly|            location|                name|number_of_rooms|             payment|              photos|            policies|price_category|programmes|              rating|               rooms|    spoken_languages|              themes| 

In [19]:
df_processed = processor.third_process(df_processed)
df_processed.show(5)

2025-02-18 23:43:24,881 - INFO - >> sub-process : process_checkin_checkout_times() completed =====


+------------------+------+----------------------+--------+--------------------+--------------------+--------------------+-----------+----------------+--------------------+--------------------+---------------+--------------------+--------------------+--------------------+--------------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+------------+----+----------------+--------+---------+---------+-----------+-----------+----------+-----+----------+--------------------+----------------+-----------+------------+--------------------+------------+------------+----------+-------------+-----------+
|accommodation_type|brands|checkin_checkout_times|currency|       deep_link_url|         description|          facilities|         id|is_work_friendly|            location|                name|number_of_rooms|             payment|              photos|            policies|price_category|programmes|              rating|              

In [20]:
df_processed = processor.fourth_process(df_processed)
df_processed.show(5)

2025-02-18 23:43:25,582 - INFO - >> sub-process : process_property_slug() completed =====


+------------------+------+----------------------+--------+--------------------+--------------------+--------------------+-----------+----------------+--------------------+--------------------+---------------+--------------------+--------------------+--------------------+--------------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+------------+----+----------------+--------+---------+---------+-----------+-----------+----------+-----+----------+--------------------+----------------+-----------+------------+--------------------+------------+------------+----------+-------------+-----------+--------------------+----------+
|accommodation_type|brands|checkin_checkout_times|currency|       deep_link_url|         description|          facilities|         id|is_work_friendly|            location|                name|number_of_rooms|             payment|              photos|            policies|price_category|programmes|   

In [21]:
df_processed = processor.fifth_process(df_processed)
df_processed.show(5)

2025-02-18 23:43:26,250 - INFO - >> sub-process : process_property_type() completed =====
[Stage 34:>                                                         (0 + 1) / 1]

+------------------+------+----------------------+--------+--------------------+--------------------+--------------------+-----------+----------------+--------------------+--------------------+---------------+--------------------+--------------------+--------------------+--------------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+------------+----+----------------+--------+---------+---------+-----------+-----------+----------+-----+----------+--------------------+----------------+-----------+------------+--------------------+------------+------------+----------+-------------+-----------+--------------------+----------+-------------+
|accommodation_type|brands|checkin_checkout_times|currency|       deep_link_url|         description|          facilities|         id|is_work_friendly|            location|                name|number_of_rooms|             payment|              photos|            policies|price_category|

                                                                                

In [22]:
df_processed = processor.sixth_process(df_processed)
df_processed.show(5)

2025-02-18 23:43:31,223 - INFO - >> sub-process : process_property_type_category() completed =====
25/02/18 23:43:31 WARN DAGScheduler: Broadcasting large task binary with size 1288.6 KiB
[Stage 35:>                                                         (0 + 1) / 1]

+------------------+------+----------------------+--------+--------------------+--------------------+--------------------+-----------+----------------+--------------------+--------------------+---------------+--------------------+--------------------+--------------------+--------------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+------------+----+----------------+--------+---------+---------+-----------+-----------+----------+-----+----------+--------------------+----------------+-----------+------------+--------------------+------------+------------+----------+-------------+-----------+--------------------+----------+-------------+----------------------+
|accommodation_type|brands|checkin_checkout_times|currency|       deep_link_url|         description|          facilities|         id|is_work_friendly|            location|                name|number_of_rooms|             payment|              photos|            p

                                                                                

In [23]:
df_processed = processor.seventh_process(df_processed)
df_processed.show(5)

2025-02-18 23:43:33,728 - INFO - >> sub-process : process_property_type_categories() completed =====
25/02/18 23:43:34 WARN DAGScheduler: Broadcasting large task binary with size 1909.1 KiB
[Stage 36:>                                                         (0 + 1) / 1]

+------------------+------+----------------------+--------+--------------------+--------------------+--------------------+-----------+----------------+--------------------+--------------------+---------------+--------------------+--------------------+--------------------+--------------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+------------+----+----------------+--------+---------+---------+-----------+-----------+----------+-----+----------+--------------------+----------------+-----------+------------+--------------------+------------+------------+----------+-------------+-----------+--------------------+----------+-------------+----------------------+------------------------+
|accommodation_type|brands|checkin_checkout_times|currency|       deep_link_url|         description|          facilities|         id|is_work_friendly|            location|                name|number_of_rooms|             payment|         

                                                                                

In [24]:
df_processed = processor.eighth_process(df_processed)
df_processed.show(5)

2025-02-18 23:43:36,511 - INFO - >> sub-process : process_room_type() completed =====
25/02/18 23:43:36 WARN DAGScheduler: Broadcasting large task binary with size 1909.6 KiB
[Stage 37:>                                                         (0 + 1) / 1]

+------------------+------+----------------------+--------+--------------------+--------------------+--------------------+-----------+----------------+--------------------+--------------------+---------------+--------------------+--------------------+--------------------+--------------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+------------+----+----------------+--------+---------+---------+-----------+-----------+----------+-----+----------+--------------------+----------------+-----------+------------+--------------------+------------+------------+----------+-------------+-----------+--------------------+----------+-------------+----------------------+------------------------+------------+
|accommodation_type|brands|checkin_checkout_times|currency|       deep_link_url|         description|          facilities|         id|is_work_friendly|            location|                name|number_of_rooms|             paym

                                                                                

In [None]:
df_processed = processor.ninth_process(df_processed)
df_processed.show(5)

2025-02-18 23:43:41,332 - INFO - >> sub-process : process_amenities_data() completed =====
25/02/18 23:43:42 WARN DAGScheduler: Broadcasting large task binary with size 1925.0 KiB
[Stage 38:>(12 + 4) / 261][Stage 39:> (0 + 0) / 261][Stage 40:>   (0 + 0) / 4]1]

In [None]:
df_processed = processor.tenth_process(df_processed)
df_processed.show(5)

2025-02-18 23:09:44,310 - INFO - >> sub-process : process_policies_data() completed =====
25/02/18 23:09:50 WARN DAGScheduler: Broadcasting large task binary with size 1961.3 KiB
25/02/18 23:09:56 WARN DAGScheduler: Broadcasting large task binary with size 2046.3 KiB


+--------+------------------+------+----------------------+--------+--------------------+--------------------+--------------------+----------------+--------------------+--------------------+---------------+--------------------+--------------------+--------------------+--------------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------+-----------+------------+--------------------+------------+------------+----------+-------------+-----------+--------------------+----------+------------------+----------------------+------------------------+------------+--------------------+--------------------+--------------------+--------------------+----------------------+---------------------------+--------------------+--------------------+
|      id|accommodation_type|brands|checkin_checkout_times|currency|       deep_link_url|         description|          facilities|is_work_friendly|            loc

                                                                                

In [None]:
df_processed = processor.eleventh_process(df_processed)
df_processed.show(5)

2025-02-18 23:09:56,673 - INFO - >> sub-process : process_min_stay() completed =====
25/02/18 23:10:02 WARN DAGScheduler: Broadcasting large task binary with size 1961.3 KiB
25/02/18 23:10:06 WARN DAGScheduler: Broadcasting large task binary with size 2046.4 KiB


+--------+------------------+------+----------------------+--------+--------------------+--------------------+--------------------+----------------+--------------------+--------------------+---------------+--------------------+--------------------+--------------------+--------------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------+-----------+------------+--------------------+------------+------------+----------+-------------+-----------+--------------------+----------+------------------+----------------------+------------------------+------------+--------------------+--------------------+--------------------+--------------------+----------------------+---------------------------+--------------------+--------------------+--------+
|      id|accommodation_type|brands|checkin_checkout_times|currency|       deep_link_url|         description|          facilities|is_work_friendly|      

In [None]:
df_processed = processor.twelveth_process(df_processed)
df_processed.show(5)

2025-02-18 23:10:07,041 - INFO - >> sub-process : process_hotel_photos() completed =====
25/02/18 23:10:12 WARN DAGScheduler: Broadcasting large task binary with size 1961.3 KiB
25/02/18 23:10:18 WARN DAGScheduler: Broadcasting large task binary with size 2.0 MiB


+--------+------------------+------+----------------------+--------+--------------------+--------------------+--------------------+----------------+--------------------+--------------------+---------------+--------------------+--------------------+--------------------+--------------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------+-----------+------------+--------------------+------------+------------+----------+-------------+-----------+--------------------+----------+------------------+----------------------+------------------------+------------+--------------------+--------------------+--------------------+--------------------+----------------------+---------------------------+--------------------+--------------------+--------+--------------------+--------------------+
|      id|accommodation_type|brands|checkin_checkout_times|currency|       deep_link_url|         description|  

In [None]:
df_processed = processor.thirteenth_process(df_processed)
df_processed.show(5)

2025-02-18 23:10:19,465 - INFO - >> sub-process : process_rooms_data() completed =====
25/02/18 23:10:26 WARN DAGScheduler: Broadcasting large task binary with size 1961.3 KiB
25/02/18 23:10:35 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB


+--------+------------------+------+----------------------+--------+--------------------+--------------------+--------------------+----------------+--------------------+--------------------+---------------+--------------------+--------------------+--------------------+--------------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------+-----------+------------+--------------------+------------+------------+----------+-------------+-----------+--------------------+----------+------------------+----------------------+------------------------+------------+--------------------+--------------------+--------------------+--------------------+----------------------+---------------------------+--------------------+--------------------+--------+--------------------+--------------------+-------------+--------------+-------------+--------------+--------+---------+
|      id|accommodation_type|brands

In [None]:
df_processed = processor.fourteenth_process(df_processed)
df_processed.show(5)

2025-02-18 23:10:36,546 - INFO - >> sub-process : process_feed_provider_url_and_licence() completed =====
25/02/18 23:10:43 WARN DAGScheduler: Broadcasting large task binary with size 1961.3 KiB
25/02/18 23:10:51 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB


+--------+------------------+------+----------------------+--------+--------------------+--------------------+--------------------+----------------+--------------------+--------------------+---------------+--------------------+--------------------+--------------------+--------------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------+-----------+------------+--------------------+------------+------------+----------+-------------+-----------+--------------------+----------+------------------+----------------------+------------------------+------------+--------------------+--------------------+--------------------+--------------------+----------------------+---------------------------+--------------------+--------------------+--------+--------------------+--------------------+-------------+--------------+-------------+--------------+--------+---------+--------------------+---------------

In [None]:
df_processed = processor.fifteenth_process(df_processed)
df_processed.show(5)

2025-02-18 23:10:52,221 - INFO - >> sub-process : prepare_is_published() completed =====
25/02/18 23:10:59 WARN DAGScheduler: Broadcasting large task binary with size 1961.3 KiB
25/02/18 23:11:09 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB


+--------+------------------+------+----------------------+--------+--------------------+--------------------+--------------------+----------------+--------------------+--------------------+---------------+--------------------+--------------------+--------------------+--------------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------+-----------+------------+--------------------+------------+------------+----------+-------------+-----------+--------------------+----------+------------------+----------------------+------------------------+------------+--------------------+--------------------+--------------------+--------------------+----------------------+---------------------------+--------------------+--------------------+--------+--------------------+--------------------+-------------+--------------+-------------+--------------+--------+---------+--------------------+---------------

                                                                                

# preprocess review scores data

In [None]:
before_df_reviews,after_df_reviews = processor.preprocess_reviews_data(df_processed,df_reviews_scores, df_chain_and_brand, df_search)





In [None]:
#before_df_reviews.show(5)

In [None]:
# after_df_reviews.show(5)

In [None]:
df_processed=before_df_reviews

# process property flag data

In [None]:
before_df_property_flag,after_df_property_flag = processor.preprocess_property_flags_data(df_processed,df_reviews_scores, df_chain_and_brand, df_search)



2025-02-18 23:11:20,863 - INFO - ===== : Optimized process Property Flags Data Complete : =====


In [None]:
#before_df_property_flag.show(5)

In [None]:
#after_df_property_flag.show(5)

In [None]:
df_processed=before_df_property_flag

# process chain and brand data

In [None]:
before_df_chian_brand,after_df_chian_brand = processor.preprocess_basic_dataprocess_chain_and_brand_data(df_processed,df_reviews_scores, df_chain_and_brand, df_search)

2025-02-18 23:11:21,720 - INFO - === : After Process Chain and Brand Data :====


In [None]:
before_df_chian_brand.show(5)

25/02/18 23:11:28 WARN DAGScheduler: Broadcasting large task binary with size 1961.3 KiB
25/02/18 23:11:36 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB


+--------+------------------+------+----------------------+--------+--------------------+--------------------+--------------------+----------------+--------------------+--------------------+---------------+--------------------+--------------------+--------------------+--------------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------+-----------+------------+--------------------+------------+------------+----------+-------------+-----------+--------------------+----------+------------------+----------------------+------------------------+------------+--------------------+--------------------+--------------------+--------------------+----------------------+---------------------------+--------------------+--------------------+--------+--------------------+--------------------+-------------+--------------+-------------+--------------+--------+---------+--------------------+---------------

                                                                                

In [None]:
after_df_chian_brand.show(5)

25/02/18 23:11:44 WARN DAGScheduler: Broadcasting large task binary with size 1961.3 KiB
25/02/18 23:11:54 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
[Stage 269:>                                                        (0 + 1) / 1]

+--------+------------------+------+----------------------+--------+--------------------+--------------------+--------------------+----------------+--------------------+--------------------+---------------+--------------------+--------------------+--------------------+--------------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------+-----------+------------+--------------------+------------+------------+----------+-------------+-----------+--------------------+----------+------------------+----------------------+------------------------+------------+--------------------+--------------------+--------------------+--------------------+----------------------+---------------------------+--------------------+--------------------+--------+--------------------+--------------------+-------------+--------------+-------------+--------------+--------+---------+--------------------+---------------

                                                                                

In [None]:
df_processed=before_df_chian_brand

# process commission and meal plan data

In [None]:
before_df_commission_meal,after_df_commission_meal = processor.preprocess_commission_and_meal_plan_data(df_processed,df_reviews_scores, df_chain_and_brand, df_search)

2025-02-18 23:11:56,230 - INFO - ===== : Optimized process Commission and Meal Plan Data Complete : =====


In [None]:
# before_df_commission_meal.show(5)

In [None]:
# after_df_commission_meal.show(5)

In [None]:
df_processed=before_df_commission_meal

# 