# SETUP

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
import  pyspark.sql.functions as F 
import numpy as np
from pyspark.sql.types import *

import time
from datetime import datetime





--Download jars manually besides main jars in image(image: jupyter/pyspark-notebook:spark-3.5.0) and put it in jars folder that spark,jupyter can access it
-- in container must be in .....spark/external-jars  not ........spark/jars   to can overcome the overwrite
-- Why we cannot download it in container ? because every time restart docker the all downloaded volatile(as memory)
-- SO: to make some oprational files in container permanent must be in yaml file or in your local and container access it 

-- we install --> spark-excel_2.12-3.5.0_0.20.3.jar
   and some dependencies --> poi-ooxml-5.2.5.jar
                         --> commons-io-2.13.0.jar
                         --> commons-collections4-4.4.jar

 ####### All This Jars to make Spark can read excel
  ####### Another Solution: convert file on your loacl to csv      will be more effecience than excel                  

In [2]:

# get jars from local 

spark = SparkSession.builder \
    .appName("ExcelRead") \
    .master("spark://spark-master:7077") \
    .config("spark.jars", "/usr/local/spark/external-jars/spark-excel_2.12-3.5.0_0.20.3.jar," 
                             "/usr/local/spark/external-jars/poi-ooxml-5.2.5.jar,"
                             "/usr/local/spark/external-jars/commons-collections4-4.4.jar,"
                             "/usr/local/spark/external-jars/commons-io-2.13.0.jar") \
    .getOrCreate()


####### get jars online (neeed downloaded every time)

spark = SparkSession.builder \
    .appName("ExcelTest") \
    .config("spark.jars.packages", "com.crealytics:spark-excel_2.12:3.5.0_0.20.3") \
    .getOrCreate()

-- .config("spark.jars.packages",<online resource>)
-- .config("spark.jars",<offline resource>)

# SHOW FILES in SHARED FOLDER (OF CONTAINER)

In [18]:

import os ## os mean container only ~ and we have folder(directory) in container called "data" ("shared" in our local machine)

shared_path= "/data"


files=["hotels_full_data_c2.xlsx",
"hotels_reviews_data_v5.xlsx"
]

# PIPILINE TO BUILD BRONZE LAYER 

In [4]:
for file_name in files:
  print("="*30)
  print(f"‚è© File name : {file_name}")
  print("="*30)
  print("")
  print(f"{file_name } Spark Read file .... ‚åõ")

  df = spark.read \
    .format("com.crealytics.spark.excel") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load(f"{shared_path}/{file_name}")
  
  print(f"{file_name } Successfully Readed ‚úÖ")
  print("_"*30)

  file_name=file_name[0:-5]+'_bronze'

  print(f"{file_name } Store in HDFS .... ‚åõ")

  df.write.mode("overwrite").parquet(f"hdfs://namenode:9000/bronze/{file_name}.parquet")
  print(f"{file_name} Succefuuly Stored ‚úÖ")

  print("_-"*50)
  print("")


‚è© File name : hotels_full_data_c2.xlsx

hotels_full_data_c2.xlsx Spark Read file .... ‚åõ
hotels_full_data_c2.xlsx Successfully Readed ‚úÖ
______________________________
hotels_full_data_c2_bronze Store in HDFS .... ‚åõ
hotels_full_data_c2_bronze Succefuuly Stored ‚úÖ
_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-

‚è© File name : hotels_reviews_data_v5.xlsx

hotels_reviews_data_v5.xlsx Spark Read file .... ‚åõ
hotels_reviews_data_v5.xlsx Successfully Readed ‚úÖ
______________________________
hotels_reviews_data_v5_bronze Store in HDFS .... ‚åõ
hotels_reviews_data_v5_bronze Succefuuly Stored ‚úÖ
_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-



## PIPILINE TO BUILD SILVER LAYER 

In [None]:
file_name="hotels_reviews_data_v1.xlsx"
df = spark.read \
    .format("com.crealytics.spark.excel") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load(shared_path+'/'+file_name)

df.show(5)

df.printSchema()

+--------------------+---------------+-------------+--------------------+--------------------+
|          hotel_name|       reviewer|helpful_votes|                date|                text|
+--------------------+---------------+-------------+--------------------+--------------------+
|Reid's Palace, A ...|        David H|         NULL|David Hwrote a re...|A really special ...|
|Reid's Palace, A ...|     AlisonY850|         71.0|AlisonY850wrote a...|A really special ...|
|Reid's Palace, A ...|J. Kurt Schmidt|         NULL|J. Kurt Schmidtwr...|An outstanding st...|
|Reid's Palace, A ...|      TonyS1956|         13.0|TonyS1956wrote a ...|In all respects, ...|
|Reid's Palace, A ...|         Rose C|         31.0|Rose Cwrote a rev...|Our 1st time at R...|
+--------------------+---------------+-------------+--------------------+--------------------+
only showing top 5 rows

root
 |-- hotel_name: string (nullable = true)
 |-- reviewer: string (nullable = true)
 |-- helpful_votes: double (nulla

### hotel_name

In [None]:
print('_-'*50)
print('‚û§ checking hotel_name....')
print('-_'*50)
print("")
print("1-Spacing....")
print("")
df_hotel_name = df.filter( F.col("hotel_name") != F.trim(F.col("hotel_name"))).select("hotel_name")
num=df_hotel_name.count()
print(f"  Number of Spaced names : {num}")
if(num==0):
  print('  NO FIXED...‚úÖ')
else:
  df = df.withColumn("hotel_name", F.trim(F.col("hotel_name")))
  print('  SPACING FIXED...‚úÖ')

_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-
‚û§ checking hotel_name....
-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_

1-Spacing....

  Number of Spaced names : 0
  NO FIXED...‚úÖ


### reviewer

In [None]:
print('_-'*50)
print('‚û§ checking reviewer....')
print('-_'*50)
print("")
print("1-Spacing....")
print("")
df_reviewer = df.filter( F.col("reviewer") != F.trim(F.col("reviewer"))).select("reviewer")
num=df_reviewer.count()
print(f"  Number of Spaced names : {num}")
if(num==0):
  print('  NO FIXED...‚úÖ')
else:
  df = df.withColumn("reviewer", F.trim(F.col("reviewer")))
  print('  SPACEING FIXED...‚úÖ')

_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-
‚û§ checking reviewer....
-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_

1-Spacing....

  Number of Spaced names : 0
  NO FIXED...‚úÖ


### helpful_votes

In [None]:
print('_-' * 50)
print('‚û§ checking helpful_votes....')
print('-_' * 50)
print("")
print("1-Casting....")
print("")

df = df.withColumn("helpful_votes", F.col("helpful_votes").cast("int"))
print("  Casting DONE ‚úÖ")
print("")
print("2-Check Values....")

df_helpful_votes = df.filter(F.col("helpful_votes").isNull() | (F.col("helpful_votes") < 0))
num = df_helpful_votes.count()
print(f"  Number of Wrong Values : {num}")
if num == 0:
    print('  NO FIXED...‚úÖ')
else:
    df = df.withColumn(
        "helpful_votes",
        F.when((F.col("helpful_votes").isNull()) | (F.col("helpful_votes") < 0), 0)
         .otherwise(F.col("helpful_votes"))
    )
    print('  VALUES FIXED...‚úÖ')


_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-
‚û§ checking helpful_votes....
-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_

1-Casting....

  Casting DONE ‚úÖ

2-Check Values....
  Number of Wrong Values : 4734
  VALUES FIXED...‚úÖ


### date

In [14]:
print('_-' * 50)
print('‚û§ Checking date ....')
print('-_' * 50)
print("")

print("1-Checking for existing date values....")
date_count = df.filter(F.col("date").isNotNull() & (F.length(F.col("date")) > 0)).count()

if date_count == 0:
    print("  ‚ö†Ô∏è No date values found ‚Äî applying default regex pattern.")
    regex = r'\b(Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)\s*(\d{1,2})?\s*(\d{4})?\b'
else:
    print(f"  Found {date_count} non-empty date values ‚Äî using main regex.")
    regex = r'\b(Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)\s*(\d{1,2})?\s*(\d{4})?\b'

print("")
print("2-Applying regex extraction....")
df = df.withColumn("month", F.regexp_extract("date", regex, 1)) \
       .withColumn("year", F.regexp_extract("date", regex, 3))
print("  Regex extraction DONE ‚úÖ")

print("")
print("3-Fixing missing months....")
missing_months = df.filter((F.col("month") == "") | F.col("month").isNull()).count()
print(f"  Missing month count: {missing_months}")

if missing_months > 0:
    df = df.withColumn(
        "month",
        F.when((F.col("month") == "") | F.col("month").isNull(), "Oct").otherwise(F.col("month"))
    )
    print("  Missing months fixed with default value 'Oct' ‚úÖ")
else:
    print("  No missing months found ‚úÖ")

print("")
print("4-Filling missing years....")
df = df.withColumn("year", F.when(F.col("year") == "", "2025").otherwise(F.col("year")))
print("  Missing years handled ‚úÖ")

print("")
print("5-Creating formatted date column....")
df = df.withColumn("extracted_date_str", F.concat_ws(" ", F.col("month"), F.col("year"))) \
       .withColumn("extracted_date",
                   F.to_date(F.concat_ws("-", F.lit("01"), F.col("month"), F.col("year")),
                             "dd-MMM-yyyy"))
print("  Date column created ‚úÖ")

print("")
print("6-Sample preview:")
df.select("date", "month", "year", "extracted_date").show(10, truncate=False)


_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-
‚û§ Checking date ....
-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_

1-Checking for existing date values....
  Found 16587 non-empty date values ‚Äî using main regex.

2-Applying regex extraction....
  Regex extraction DONE ‚úÖ

3-Fixing missing months....
  Missing month count: 8
  Missing months fixed with default value 'Oct' ‚úÖ

4-Filling missing years....
  Missing years handled ‚úÖ

5-Creating formatted date column....
  Date column created ‚úÖ

6-Sample preview:
+-----------------------------------------+-----+----+--------------+
|date                                     |month|year|extracted_date|
+-----------------------------------------+-----+----+--------------+
|David Hwrote a review Oct 17             |Oct  |2025|2025-10-01    |
|AlisonY850wrote a review Oct 8           |Oct  |2025|2025-10-01    |
|J. Kurt Schmidtwro

### month

In [15]:
print('_-' * 50)
print('‚û§ Checking month ....')
print('-_' * 50)
print("")

# Check for null or empty month values
df_null_month = df.filter((F.col("month").isNull()) | (F.trim(F.col("month")) == ""))
null_count = df_null_month.count()

print(f"  Number of rows with NULL or empty month: {null_count}")
if null_count > 0:
    print("  ‚ö†Ô∏è Found missing or blank month values! ‚ùå")
    df_null_month.show()
else:
    print("  ‚úÖ No missing or blank month values found.")

print("")
#  Check distinct month values
distinct_months = df.select("month").distinct()
month_count = distinct_months.count()

print(f"  Number of distinct months found: {month_count}")
distinct_months.show(20, truncate=False)

# Validate count
if month_count > 12:
    print("  ‚ö†Ô∏è ERROR: More than 12 distinct months found (possible blank or invalid entry)! ‚ùå")
else:
    print("  ‚úÖ Month values are valid (‚â§ 12).")



_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-
‚û§ Checking month ....
-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_

  Number of rows with NULL or empty month: 0
  ‚úÖ No missing or blank month values found.



ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/socket.py", line 706, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


  Number of distinct months found: 12


KeyboardInterrupt: 

### year

In [None]:
print('_-' * 50)
print('‚û§ Checking year....')
print('-_' * 50)
print("")

# Convert year to integer
df = df.withColumn("year_int", F.col("year").cast("int"))

# Check for null or invalid (non-numeric) years
df_null_year = df.filter(F.col("year").isNull() | (F.trim(F.col("year")) == ""))
null_year_count = df_null_year.count()
print(f"  Number of rows with NULL or empty year: {null_year_count}")

if null_year_count > 0:
    print("  ‚ö†Ô∏è Found missing or blank year values! ‚ùå")
    df_null_year.show()
else:
    print("  ‚úÖ No missing or blank year values found.")

print("")
# Check for out-of-range years (negative or > 2025)
df_invalid_years = df.filter((F.col("year_int") < 0) | (F.col("year_int") > 2025))
invalid_year_count = df_invalid_years.count()

print(f"  Number of invalid years (negative or > 2025): {invalid_year_count}")
if invalid_year_count > 0:
    print("  ‚ö†Ô∏è Found invalid year values! Fixing them to 2025...")
    df = df.withColumn(
        "year",
        F.when((F.col("year_int") < 0) | (F.col("year_int") > 2025) | F.col("year_int").isNull(),
               "2025").otherwise(F.col("year"))
    )
    print("  ‚úÖ Invalid years fixed to 2025.")
else:
    print("  ‚úÖ All year values are valid (0 ‚â§ year ‚â§ 2025).")

print("")



_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-
‚û§ Checking year....
-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_

  Number of rows with NULL or empty year: 0
  ‚úÖ No missing or blank year values found.

  Number of invalid years (negative or > 2025): 0
  ‚úÖ All year values are valid (0 ‚â§ year ‚â§ 2025).



## REVIEWS PIPELINE (CLEANING,TRANSFORMATION,TRACKING TIME,..)

In [19]:

# Utility function for timing
def log_time(stage_name, func):
    print('_-'*50)
    print(f'‚û§ {stage_name}....')
    print('-_'*50)
    start_time = time.time()
    print(f"‚è±Ô∏è  Start time: {datetime.now().strftime('%H:%M:%S')}")
    print("")

    try:
        func()
    except Exception as e:
        print(f"‚ùå Error in {stage_name}: {e}")
    finally:
        end_time = time.time()
        duration = round(end_time - start_time, 2)
        print(f"\n‚úÖ End time: {datetime.now().strftime('%H:%M:%S')}")
        print(f"üïí Duration: {duration} seconds")
        print('_-'*50 + "\n")
        

# ---------------- STAGE 1 ----------------
def check_hotel_name():
    try:
        print("1-Spacing....\n")
        df_hotel_name = df.filter(F.col("hotel_name") != F.trim(F.col("hotel_name"))).select("hotel_name")
        num = df_hotel_name.count()
        print(f"  Number of Spaced names : {num}")
        if num == 0:
            print('  NO FIXED...‚úÖ')
        else:
            globals()['df'] = df.withColumn("hotel_name", F.trim(F.col("hotel_name")))
            print('  SPACING FIXED...‚úÖ')
    except Exception as e:
        print(f"‚ùå Exception in hotel_name stage: {e}")

# ---------------- STAGE 2 ----------------
def check_reviewer():
    try:
        print("1-Spacing....\n")
        df_reviewer = df.filter(F.col("reviewer") != F.trim(F.col("reviewer"))).select("reviewer")
        num = df_reviewer.count()
        print(f"  Number of Spaced names : {num}")
        if num == 0:
            print('  NO FIXED...‚úÖ')
        else:
            globals()['df'] = df.withColumn("reviewer", F.trim(F.col("reviewer")))
            print('  SPACING FIXED...‚úÖ')
    except Exception as e:
        print(f"‚ùå Exception in reviewer stage: {e}")

# ---------------- STAGE 3 ----------------
def check_helpful_votes():
    try:
        print("1-Casting....\n")
        globals()['df'] = df.withColumn("helpful_votes", F.col("helpful_votes").cast("int"))
        print("  Casting DONE ‚úÖ\n")

        print("2-Check Values....")
        df_helpful_votes = df.filter(F.col("helpful_votes").isNull() | (F.col("helpful_votes") < 0))
        num = df_helpful_votes.count()
        print(f"  Number of Wrong Values : {num}")
        if num == 0:
            print('  NO FIXED...‚úÖ')
        else:
            globals()['df'] = df.withColumn(
                "helpful_votes",
                F.when((F.col("helpful_votes").isNull()) | (F.col("helpful_votes") < 0), 0)
                 .otherwise(F.col("helpful_votes"))
            )
            print('  VALUES FIXED...‚úÖ')
    except Exception as e:
        print(f"‚ùå Exception in helpful_votes stage: {e}")

# ---------------- STAGE 4 ----------------
def check_date():
    try:
        print("1-Checking for existing date values....")
        date_count = df.filter(F.col("date").isNotNull() & (F.length(F.col("date")) > 0)).count()

        if date_count == 0:
            print("  ‚ö†Ô∏è No date values found ‚Äî applying default regex pattern.")
            regex = r'\b(Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)\s*(\d{1,2})?\s*(\d{4})?\b'
        else:
            print(f"  Found {date_count} non-empty date values ‚Äî using main regex.")
            regex = r'\b(Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)\s*(\d{1,2})?\s*(\d{4})?\b'

        print("\n2-Applying regex extraction....")
        globals()['df'] = df.withColumn("month", F.regexp_extract("date", regex, 1)) \
                           .withColumn("year", F.regexp_extract("date", regex, 3))
        print("  Regex extraction DONE ‚úÖ")

        print("\n3-Fixing missing months....")
        missing_months = df.filter((F.col("month") == "") | F.col("month").isNull()).count()
        print(f"  Missing month count: {missing_months}")
        if missing_months > 0:
            globals()['df'] = df.withColumn(
                "month",
                F.when((F.col("month") == "") | F.col("month").isNull(), "Oct").otherwise(F.col("month"))
            )
            print("  Missing months fixed with default value 'Oct' ‚úÖ")
        else:
            print("  No missing months found ‚úÖ")

        print("\n4-Filling missing years....")
        globals()['df'] = df.withColumn("year", F.when(F.col("year") == "", "2025").otherwise(F.col("year")))
        print("  Missing years handled ‚úÖ")

        print("\n5-Creating formatted date column....")
        globals()['df'] = df.withColumn("extracted_date_str", F.concat_ws(" ", F.col("month"), F.col("year"))) \
                           .withColumn("extracted_date",
                                       F.to_date(F.concat_ws("-", F.lit("01"), F.col("month"), F.col("year")),
                                                 "dd-MMM-yyyy"))
        print("  Date column created ‚úÖ")

        

    except Exception as e:
        print(f"‚ùå Exception in date stage: {e}")

# ---------------- STAGE 5 ----------------
def check_month():
    try:
        print("1-Checking null or empty months....\n")
        df_null_month = df.filter((F.col("month").isNull()) | (F.trim(F.col("month")) == ""))
        null_count = df_null_month.count()
        print(f"  Number of rows with NULL or empty month: {null_count}")
        if null_count > 0:
            print("  ‚ö†Ô∏è Found missing or blank month values! ‚ùå")
            df_null_month.show()
        else:
            print("  ‚úÖ No missing or blank month values found.")

        print("\n2-Checking distinct months....")
        distinct_months = df.select("month").distinct()
        month_count = distinct_months.count()
        print(f"  Number of distinct months found: {month_count}")
        distinct_months.show(20, truncate=False)

        if month_count > 12:
            print("  ‚ö†Ô∏è ERROR: More than 12 distinct months found! ‚ùå")
        else:
            print("  ‚úÖ Month values are valid (‚â§ 12).")
    except Exception as e:
        print(f"‚ùå Exception in month stage: {e}")

# ---------------- STAGE 6 ----------------
def check_year():
    try:
        print("1-Casting and validation....\n")
        globals()['df'] = df.withColumn("year_int", F.col("year").cast("int"))

        df_null_year = df.filter(F.col("year").isNull() | (F.trim(F.col("year")) == ""))
        null_year_count = df_null_year.count()
        print(f"  Number of rows with NULL or empty year: {null_year_count}")
        if null_year_count > 0:
            print("  ‚ö†Ô∏è Found missing or blank year values! ‚ùå")
            df_null_year.show()
        else:
            print("  ‚úÖ No missing or blank year values found.")

        print("\n2-Checking invalid year range....")
        df_invalid_years = df.filter((F.col("year_int") < 0) | (F.col("year_int") > 2025))
        invalid_year_count = df_invalid_years.count()
        print(f"  Number of invalid years (negative or > 2025): {invalid_year_count}")
        if invalid_year_count > 0:
            print("  ‚ö†Ô∏è Found invalid year values! Fixing them to 2025...")
            globals()['df'] = df.withColumn(
                "year",
                F.when((F.col("year_int") < 0) | (F.col("year_int") > 2025) | F.col("year_int").isNull(),
                       "2025").otherwise(F.col("year"))
            )
            print("  ‚úÖ Invalid years fixed to 2025.")
        else:
            print("  ‚úÖ All year values are valid (0 ‚â§ year ‚â§ 2025).")

        
    except Exception as e:
        print(f"‚ùå Exception in year stage: {e}")


# ======= RUN STAGES WITH TIMING =======
#log_time("Checking hotel_name", check_hotel_name)
#log_time("Checking reviewer", check_reviewer)
#log_time("Checking helpful_votes", check_helpful_votes)
#log_time("Checking date", check_date)
#log_time("Checking month", check_month)
#log_time("Checking year", check_year)


## UPLOAD in HDFS SILVER

In [20]:

for file_name in files:

    if 'reviews' in file_name:
        print("="*30)
        print(f"‚è© File name : {file_name}")
        print("="*30)
        print("")

        try:
            print(f"{file_name} Spark Read file .... ‚åõ")
            df = spark.read \
                .format("com.crealytics.spark.excel") \
                .option("header", "true") \
                .option("inferSchema", "true") \
                .load(shared_path + '/' + file_name)
            print(f"{file_name} Spark Read DONE ‚úÖ\n")
        except Exception as e:
            print(f"‚ùå Error reading file {file_name}: {e}")
            continue  # skip this file if reading fails

        # ======= Run cleaning/validation stages with timing =======
        log_time("Checking hotel_name", check_hotel_name)
        log_time("Checking reviewer", check_reviewer)
        log_time("Checking helpful_votes", check_helpful_votes)
        log_time("Checking date", check_date)
        log_time("Checking month", check_month)
        log_time("Checking year", check_year)

        print(f"{file_name} Successfully Processed ‚úÖ")
        print("_"*30)

        # Prepare silver file name
        silver_file_name = file_name[0:-5] + '_silver'

        try:
            print(f"{silver_file_name} Store in HDFS .... ‚åõ")
            df.write.mode("overwrite").parquet(f"hdfs://namenode:9000/silver/{silver_file_name}.parquet")
            print(f"{silver_file_name} Successfully Stored ‚úÖ")
        except Exception as e:
            print(f"‚ùå Error writing {silver_file_name} to HDFS: {e}")

        print("_-"*50)
        print("")



      

‚è© File name : hotels_reviews_data_v5.xlsx

hotels_reviews_data_v5.xlsx Spark Read file .... ‚åõ
hotels_reviews_data_v5.xlsx Spark Read DONE ‚úÖ

_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-
‚û§ Checking hotel_name....
-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_
‚è±Ô∏è  Start time: 21:49:18

1-Spacing....

  Number of Spaced names : 0
  NO FIXED...‚úÖ

‚úÖ End time: 21:49:18
üïí Duration: 0.53 seconds
_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-

_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-
‚û§ Checking reviewer....
-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_
‚è±Ô∏è  Start time: 21:49:18

1-Spacing....

  Number of Spaced names : 0
  NO FIXED...‚úÖ

‚úÖ End time: 21:49:19
üïí Duration: 0.51 seconds
_-_-_-_

## Hotels PIPELINE

In [None]:
file_name="hotels_full_data_c1.xlsx"
df = spark.read \
    .format("com.crealytics.spark.excel") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load(shared_path+'/'+file_name)

df.show(5)

df.printSchema()

In [None]:
print('_-'*50)
print('‚û§ checking hotel_name....')
print('-_'*50)
print("")
print("1-Spacing....")
print("")
df_hotel_name = df.filter( F.col("name") != F.trim(F.col("name"))).select("name")
num=df_hotel_name.count()
print(f"  Number of Spaced names : {num}")
if(num==0):
  print('  NO FIXED...‚úÖ')
else:
  df = df.withColumn("name", F.trim(F.col("name")))
  print('  SPACING FIXED...‚úÖ')

In [None]:
from pyspark.sql.functions import regexp_replace, concat, lit, col

df = df.withColumn(
    "gps_link",
    concat(
        lit("https://www.google.com/maps/search/?api=1&query="),
        regexp_replace(col("location"), " ", "+")
    )
)

df.select("location", "gps_link").show(truncate=False)


In [None]:
from pyspark.sql.functions import split, explode, trim, col

# Split the comma-separated amenities into an array
df_split = df.withColumn("amenity", explode(split(col("amenities"), ",")))

# Trim spaces
df_split = df_split.withColumn("amenity", trim(col("amenity")))

# Get distinct amenities
unique_amenities = df_split.select("amenity").distinct()

# Collect to Python list
amenities_list = [row["amenity"] for row in unique_amenities.collect()]

print(len(amenities_list))


In [None]:
from pyspark.sql.functions import col

for amenity in amenities_list:
    clean_name = amenity.lower().replace(" ", "_").replace("/", "_").replace("'", "").replace("(", "").replace(")", "")
    df = df.withColumn(
        clean_name,
        col("amenities").contains(amenity).cast("int")
    )

In [None]:
df.show(10)

In [None]:
df_rating = df.filter(
   (col("rating") < 0) | (col("rating") > 5)
).show()

In [None]:
df_price = df.filter((col("price") < 0)).select("name")


In [None]:
from pyspark.sql.functions import split, col

# Split the images column by comma
df = df.withColumn("images_array", split(col("images"), ","))

# Create 5 new columns from the array elements
for i in range(5):
    df = df.withColumn(f"image{i+1}", col("images_array")[i])


In [None]:
df.select("image1").show(truncate=False)


In [21]:
# Utility function for timing
def log_time(stage_name, func):
    print('_-'*50)
    print(f'‚û§ {stage_name}....')
    print('-_'*50)
    start_time = time.time()
    print(f"‚è±Ô∏è  Start time: {datetime.now().strftime('%H:%M:%S')}")
    print("")

    try:
        func()
    except Exception as e:
        print(f"‚ùå Error in {stage_name}: {e}")
    finally:
        end_time = time.time()
        duration = round(end_time - start_time, 2)
        print(f"\n‚úÖ End time: {datetime.now().strftime('%H:%M:%S')}")
        print(f"üïí Duration: {duration} seconds")
        print('_-'*50 + "\n")



# ---------------- STAGE 1 ----------------
def check_hotel_name():
    try:
        print('_-' * 50)
        print('‚û§ Checking hotel_name....')
        print('-_' * 50)
        print("")
        print("1-Spacing....")
        print("")

        df_hotel_name = df.filter(F.col("name") != F.trim(F.col("name"))).select("name")
        num = df_hotel_name.count()
        print(f"  Number of Spaced names : {num}")
        if num == 0:
            print('  NO FIXED...‚úÖ')
        else:
            globals()['df'] = df.withColumn("name", F.trim(F.col("name")))
            print('  SPACING FIXED...‚úÖ')

    except Exception as e:
        print(f"‚ùå Exception in hotel_name stage: {e}")


# ---------------- STAGE 2 ----------------
def create_gps_link():
    try:
        print('_-' * 50)
        print('‚û§ Creating GPS link for each location....')
        print('-_' * 50)
        print("")

        globals()['df'] = df.withColumn(
            "gps_link",
            F.concat(
                F.lit("https://www.google.com/maps/search/?api=1&query="),
                F.regexp_replace(F.col("location"), " ", "+")
            )
        )

        print("  GPS link column created successfully ‚úÖ")
        print("  Sample links:")
        df.select("location", "gps_link").show(5, truncate=False)

    except Exception as e:
        print(f"‚ùå Exception in GPS link stage: {e}")


# ---------------- STAGE 3 ----------------
def extract_amenities():
    try:
        print('_-' * 50)
        print('‚û§ Extracting amenities....')
        print('-_' * 50)
        print("")

        df_split = df.withColumn("amenity", F.explode(F.split(F.col("amenities"), ",")))
        df_split = df_split.withColumn("amenity", F.trim(F.col("amenity")))

        unique_amenities = df_split.select("amenity").distinct()
        amenities_list = [row["amenity"] for row in unique_amenities.collect()]
        print(f"  Total unique amenities found: {len(amenities_list)}")

        for amenity in amenities_list:
            clean_name = (
                amenity.lower()
                .replace(" ", "_")
                .replace("/", "_")
                .replace("'", "")
                .replace("(", "")
                .replace(")", "")
            )
            globals()['df'] = df.withColumn(
                clean_name,
                F.col("amenities").contains(amenity).cast("int")
            )

        print("  Amenity columns created successfully ‚úÖ")

    except Exception as e:
        print(f"‚ùå Exception in amenities stage: {e}")


# ---------------- STAGE 4 ----------------
def check_rating():
    try:
        print('_-' * 50)
        print('‚û§ Checking rating values....')
        print('-_' * 50)
        print("")

        df_invalid = df.filter((F.col("rating") < 0) | (F.col("rating") > 5))
        invalid_count = df_invalid.count()
        print(f"  Number of invalid ratings (<0 or >5): {invalid_count}")

        if invalid_count == 0:
            print("  All ratings are valid ‚úÖ")
        else:
            globals()['df'] = df.withColumn(
                "rating",
                F.when((F.col("rating") < 0) | (F.col("rating") > 5), 0)
                 .otherwise(F.col("rating"))
            )
            print("  Invalid ratings fixed to 0 ‚úÖ")

    except Exception as e:
        print(f"‚ùå Exception in rating stage: {e}")


# ---------------- STAGE 5 ----------------
def check_price():
    try:
        print('_-' * 50)
        print('‚û§ Checking price values....')
        print('-_' * 50)
        print("")

        df_negative_price = df.filter(F.col("price") < 0).select("name")
        count_neg = df_negative_price.count()
        print(f"  Number of hotels with negative price: {count_neg}")

        if count_neg == 0:
            print("  All prices are valid ‚úÖ")
        else:
            globals()['df'] = df.withColumn(
                "price",
                F.when(F.col("price") < 0, 0).otherwise(F.col("price"))
            )
            print("  Negative prices fixed to 0 ‚úÖ")

    except Exception as e:
        print(f"‚ùå Exception in price stage: {e}")


# ---------------- STAGE 6 ----------------
def split_images():
    try:
        print('_-' * 50)
        print('‚û§ Splitting images into 5 columns....')
        print('-_' * 50)
        print("")

        globals()['df'] = df.withColumn("images_array", F.split(F.col("images"), ","))

        for i in range(5):
            globals()['df'] = df.withColumn(f"image{i+1}", F.col("images_array")[i])

        print("  Image columns created successfully ‚úÖ")
        df.select("name", "image1", "image2", "image3", "image4", "image5").show(3, truncate=False)

    except Exception as e:
        print(f"‚ùå Exception in images stage: {e}")


 #======= RUN NEW STAGES WITH TIMING =======
#log_time("Checking hotel_name", check_hotel_name)
#log_time("Creating GPS link", create_gps_link)
#log_time("Extracting amenities", extract_amenities)
#log_time("Checking rating", check_rating)
#log_time("Checking price", check_price)
#log_time("Splitting images", split_images)


In [22]:
for file_name in files:

    if 'full_data' in file_name:
        print("="*30)
        print(f"‚è© File name : {file_name}")
        print("="*30)
        print("")

        try:
            print(f"{file_name} Spark Read file .... ‚åõ")
            df = spark.read \
                .format("com.crealytics.spark.excel") \
                .option("header", "true") \
                .option("inferSchema", "true") \
                .load(shared_path + '/' + file_name)
            print(f"{file_name} Spark Read DONE ‚úÖ\n")
        except Exception as e:
            print(f"‚ùå Error reading file {file_name}: {e}")
            continue  # skip this file if reading fails

        #======= RUN NEW STAGES WITH TIMING =======
        log_time("Checking hotel_name", check_hotel_name)
        log_time("Creating GPS link", create_gps_link)
        log_time("Extracting amenities", extract_amenities)
        log_time("Checking rating", check_rating)
        log_time("Checking price", check_price)
        log_time("Splitting images", split_images)

        print(f"{file_name} Successfully Processed ‚úÖ")
        print("_"*30)

        # Prepare silver file name
        silver_file_name = file_name[0:-5] + '_silver'

        try:
            print(f"{silver_file_name} Store in HDFS .... ‚åõ")
            df.write.mode("overwrite").parquet(f"hdfs://namenode:9000/silver/{silver_file_name}.parquet")
            print(f"{silver_file_name} Successfully Stored ‚úÖ")
        except Exception as e:
            print(f"‚ùå Error writing {silver_file_name} to HDFS: {e}")

        print("_-"*50)
        print("")



      

‚è© File name : hotels_full_data_c2.xlsx

hotels_full_data_c2.xlsx Spark Read file .... ‚åõ
hotels_full_data_c2.xlsx Spark Read DONE ‚úÖ

_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-
‚û§ Checking hotel_name....
-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_
‚è±Ô∏è  Start time: 21:50:34

_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-
‚û§ Checking hotel_name....
-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_

1-Spacing....

  Number of Spaced names : 0
  NO FIXED...‚úÖ

‚úÖ End time: 21:50:34
üïí Duration: 0.15 seconds
_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-

_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-
‚û§ Creating GPS link....
-_-_-_-_-_-_-_-_-_-_-_-_-_-_-

## TRANSFER DATA TO MY LOCAL PC

In [1]:
from pyspark.sql import SparkSession

# Create SparkSession
spark = SparkSession.builder \
    .appName("SaveParquetToCSV") \
    .getOrCreate()


## merge all reviews in one table

In [None]:
df = spark.read.parquet("hdfs://namenode:9000/silver/hotels_reviews_data_*.parquet")

# Drop the column you don't need
df = df.drop("images_array")

# Write as single CSV file
output_path = "/data/hotels_reviews_data_v_silver_csv"
df.coalesce(1).write \
    .option("header", "true") \
    .mode("overwrite") \
    .csv(output_path)

print(f"‚úÖ Successfully written CSV to {output_path}")



‚úÖ Successfully written CSV to /data/hotels_full_data_c2_silver_csv


## merge all hotels data in one table

to merge 2 files must be same but there are files have properties and other not so we extract all anemities columns put it in list and mapping to every file 

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit

spark = SparkSession.builder.appName("HotelsDataMerge").getOrCreate()


df1 = spark.read.parquet("hdfs://namenode:9000/silver/hotels_full_data_c1_silver.parquet")
df2 = spark.read.parquet("hdfs://namenode:9000/silver/hotels_full_data_c2_silver.parquet")

core_columns = [
    "name", "location", "amenities", "rating", "price",
    "images", "gps_link", "image1", "image2", "image3", "image4", "image5"
]


all_columns = list(set(df1.columns) | set(df2.columns))

extra_columns = [c for c in all_columns if c not in core_columns]

def add_missing_extra_columns(df, extra_cols):
    for c in extra_cols:
        if c not in df.columns:
            df = df.withColumn(c, lit(0))

    return df.select(core_columns + sorted(extra_cols))


df1 = add_missing_extra_columns(df1, extra_columns)
df2 = add_missing_extra_columns(df2, extra_columns)
df1=df1.drop("images_array")
df2=df2.drop("images_array")

df_merged = df1.unionByName(df2)



df_merged.coalesce(1).write.option("header", "true").mode("overwrite").csv("/data/hotels_full_data_c_silver_csv")

print("‚úÖ Merge completed and CSV written successfully")


‚úÖ Merge completed and CSV written successfully


In [3]:
!pip install pandas transformers torch tqdm plotly

Collecting transformers
  Downloading transformers-4.57.1-py3-none-any.whl.metadata (43 kB)
[2K     [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m44.0/44.0 kB[0m [31m178.5 kB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hCollecting torch
  Downloading torch-2.9.1-cp311-cp311-manylinux_2_28_x86_64.whl.metadata (30 kB)
Collecting plotly
  Downloading plotly-6.5.0-py3-none-any.whl.metadata (8.5 kB)
Collecting filelock (from transformers)
  Downloading filelock-3.20.0-py3-none-any.whl.metadata (2.1 kB)
Collecting huggingface-hub<1.0,>=0.34.0 (from transformers)
  Downloading huggingface_hub-0.36.0-py3-none-any.whl.metadata (14 kB)
Collecting regex!=2019.12.17 (from transformers)
  Downloading regex-2025.11.3-cp311-cp311-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl.metadata (40 kB)
[2K     [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚î

In [5]:
!pip install --upgrade typing-extensions



In [None]:
# ==========================================
# üèóÔ∏è PART 4: PYTHON POST-PROCESSING (PANDAS)
# ==========================================

import pandas as pd
import glob
import os
import sys
from tqdm import tqdm
from transformers import pipeline

# ÿ™ÿ´ÿ®Ÿäÿ™ ÿßŸÑŸÖŸÉÿ™ÿ®ÿßÿ™ ŸÑŸà ŸÖÿ¥ ŸÖŸàÿ¨ŸàÿØÿ© ŸÅŸä ÿßŸÑŸÉŸàŸÜÿ™ŸäŸÜÿ±
# !pip install pandas transformers torch tqdm plotly

print("üöÄ Starting Python Post-Processing...")

# --- Helper Function to read Spark CSV Output ---
# Spark saves CSVs as folders (e.g. /data/folder_name/part-0000.csv)
def read_spark_csv(folder_path):
    try:
        # Find the .csv file inside the folder
        csv_files = glob.glob(os.path.join(folder_path, "*.csv"))
        if not csv_files:
            raise FileNotFoundError(f"No CSV files found in {folder_path}")
        
        # ÿßŸÑŸÉŸàÿØ ÿßŸÑŸÖÿπÿØŸÑ ŸáŸÜÿß: ÿ•ÿ∂ÿßŸÅÿ© on_bad_lines='skip' Ÿà engine='python'
        return pd.read_csv(
            csv_files[0], 
            on_bad_lines='skip', 
            engine='python' # Ÿäÿ≥ÿ™ÿÆÿØŸÖ ŸÖÿ≠ÿ±ŸÉ ÿßŸÑÿ®ÿßŸäÿ´ŸàŸÜ ÿßŸÑÿ£ŸÉÿ´ÿ± ŸÖÿ±ŸàŸÜÿ©
        )
    except Exception as e:
        print(f"‚ö†Ô∏è Error reading Spark output from {folder_path}: {e}")
        return pd.DataFrame()

# ==========================================
# üìç STEP 1: MERGE GEOLOCATION DATA
# ==========================================
print("\nüìç Starting Location Merge...")

# 1. Read Hotel Data (Produced by Spark)
# Note: Path is /data/ because it's mapped in Docker
hotels_df = read_spark_csv('/data/hotels_full_data_c_silver_csv')
print(f"   Loaded Hotels Data: {len(hotels_df)} rows")

if not hotels_df.empty:
    # Clean duplicates
    hotels_df = hotels_df.drop_duplicates(subset=['name', 'location'])
    
    # 2. Read Location Files from /data/location_l_l
    loc_path = '/data/location_l_l'
    all_loc_files = glob.glob(os.path.join(loc_path, "*.csv"))
    
    loc_list = []
    for filename in all_loc_files:
        try:
            df_temp = pd.read_csv(filename)
            loc_list.append(df_temp)
        except:
            pass

    if loc_list:
        concat_location_data = pd.concat(loc_list, ignore_index=True)
        
        # Keep only necessary columns
        cols_to_keep = ['location', 'Latitude', 'Longitude']
        if 'OpenCage Note' in concat_location_data.columns:
            cols_to_keep.append('OpenCage Note')
            
        location_clean = concat_location_data[cols_to_keep].drop_duplicates(subset=['location'])
        
        # 3. Merge
        final_hotels = pd.merge(hotels_df, location_clean, on='location', how='left')
        
        # Save Intermediate result
        final_hotels.to_csv('/data/final_data_without_sentiment.csv', index=False)
        print(f"‚úÖ Location Merge Done. Saved /data/final_data_without_sentiment.csv ({len(final_hotels)} rows)")
    else:
        print("‚ö†Ô∏è No location files found in /data/location_l_l. Skipping merge.")
        final_hotels = hotels_df
else:
    print("‚ùå Hotels data is empty. Skipping location merge.")
    final_hotels = pd.DataFrame()

# ==========================================
# üß† STEP 2: SENTIMENT ANALYSIS (REVIEWS)
# ==========================================
print("\nüß† Starting Sentiment Analysis...")

# Settings
# Input comes from Spark's reviews output
reviews_folder = '/data/hotels_reviews_data_v_silver_csv' 
output_file = '/data/full_reviews_scored.csv'
save_every_n_rows = 500

# Load Model
print("   Loading NLP Model...")
sentiment_pipeline = pipeline("sentiment-analysis", 
                              model="nlptown/bert-base-multilingual-uncased-sentiment", 
                              truncation=True, 
                              max_length=512)

def analyze_sentiment(text):
    if pd.isna(text) or str(text).strip() == "": return None
    try:
        result = sentiment_pipeline(str(text))[0]
        return int(result['label'].split(' ')[0])
    except: return None

# Load Data
if os.path.exists(output_file):
    print(f"   Resuming from {output_file}...")
    df_reviews = pd.read_csv(output_file)
else:
    print(f"   Reading fresh reviews from {reviews_folder}...")
    df_reviews = read_spark_csv(reviews_folder)
    df_reviews['sentiment_score'] = None

if not df_reviews.empty:
    # Identify rows to process
    # If column doesn't exist (fresh load), create it
    if 'sentiment_score' not in df_reviews.columns:
        df_reviews['sentiment_score'] = None
        
    rows_to_process = df_reviews[df_reviews['sentiment_score'].isna()].index
    
    print(f"   Total Reviews: {len(df_reviews)}")
    print(f"   Reviews to process: {len(rows_to_process)}")
    
    if len(rows_to_process) > 0:
        processed_count = 0
        for index in tqdm(rows_to_process, desc="Analysing Sentiment"):
            review_text = df_reviews.at[index, 'text']
            score = analyze_sentiment(review_text)
            df_reviews.at[index, 'sentiment_score'] = score
            
            processed_count += 1
            if processed_count % save_every_n_rows == 0:
                df_reviews.to_csv(output_file, index=False)
        
        # Final Save
        df_reviews.to_csv(output_file, index=False)
        print("‚úÖ Sentiment Analysis Complete.")
    else:
        print("   All reviews already analyzed.")

    # ==========================================
    # üèÅ STEP 3: FINAL INTEGRATION (HOTELS + SCORES)
    # ==========================================
    print("\nüèÅ Creating Final Dataset for App...")
    
    # Aggregate Sentiment per Hotel
    df_clean_reviews = df_reviews.dropna(subset=['sentiment_score'])
    df_clean_reviews['sentiment_score'] = df_clean_reviews['sentiment_score'].astype(int)
    
    # Calculate average rating per hotel
    sentiment_summary = df_clean_reviews.groupby('hotel_name')['sentiment_score'].mean().reset_index()
    sentiment_summary.rename(columns={'hotel_name': 'name', 'sentiment_score': 'ai_score'}, inplace=True)
    
    # Merge with Hotels Data
    if not final_hotels.empty:
        # Merge on Name
        # Note: Make sure names match (trimming was done in Spark)
        final_dataset = pd.merge(final_hotels, sentiment_summary, on='name', how='left')
        
        # Fill missing scores with default 4.0
        final_dataset['ai_score'] = final_dataset['ai_score'].fillna(4.0)
        
        # Save FINAL file for the Web App
        final_dataset.to_csv('/data/final_data.csv', index=False)
        print(f"üéâ SUCCESS! Final dataset saved to: /data/final_data.csv")
        print(final_dataset.head(3))
    else:
        print("‚ùå Could not create final dataset (Missing Hotels Data)")

else:
    print("‚ùå No reviews data found.")

KeyboardInterrupt: 

In [4]:
# ==========================================
# üèóÔ∏è PART 4: FINAL ASSEMBLY (FAST MODE)
# ==========================================

import pandas as pd
import glob
import os
import sys

print("üöÄ Starting Final Assembly (Fast Mode using Summary)...")

# --- ŸÖÿ≥ÿßÿ±ÿßÿ™ ÿßŸÑŸÖŸÑŸÅÿßÿ™ (ÿØÿßÿÆŸÑ ÿßŸÑÿØŸàŸÉÿ±) ---
# 1. ŸÖŸÑŸÅÿßÿ™ ÿßŸÑŸÅŸÜÿßÿØŸÇ ŸàÿßŸÑŸÑŸàŸÉŸäÿ¥ŸÜ
hotels_input_path = '/data/hotels_full_data_c_silver_csv'
location_folder = '/data/location_l_l'

# 2. ŸÖŸÑŸÅÿßÿ™ ÿßŸÑÿ±ŸäŸÅŸäŸà (ÿßŸÑÿ™ÿµÿ≠Ÿäÿ≠ ŸáŸÜÿß üëá)
# ÿ®ŸÖÿß ÿ•ŸÜ ÿßŸÑŸÖŸÑŸÅ ŸÅŸä shared/reviewsÿå ÿßŸÑÿØŸàŸÉÿ± ÿ®Ÿäÿ¥ŸàŸÅŸá ŸÅŸä /data/reviews
reviews_summary_path = '/data/reviews/hotel_sentiment_summary.csv' 

# 3. ÿßŸÑŸÖÿÆÿ±ÿ¨ ÿßŸÑŸÜŸáÿßÿ¶Ÿä
final_output_path = '/data/final_data.csv'

# --- Helper Function ---
def read_spark_csv(folder_path):
    try:
        csv_files = glob.glob(os.path.join(folder_path, "*.csv"))
        if not csv_files: return pd.DataFrame()
        return pd.read_csv(csv_files[0], on_bad_lines='skip', engine='python')
    except: return pd.DataFrame()

# ==========================================
# üìç STEP 1: PREPARE HOTELS & LOCATION
# ==========================================
print("\nüìç [Step 1] Preparing Hotels & Location Data...")

hotels_df = read_spark_csv(hotels_input_path)
final_hotels = pd.DataFrame()

if not hotels_df.empty:
    hotels_df = hotels_df.drop_duplicates(subset=['name', 'location'])
    
    loc_files = glob.glob(os.path.join(location_folder, "*.csv"))
    loc_dfs = []
    for f in loc_files:
        try: loc_dfs.append(pd.read_csv(f))
        except: pass
    
    if loc_dfs:
        full_loc_df = pd.concat(loc_dfs, ignore_index=True)
        cols = ['location', 'Latitude', 'Longitude']
        if 'OpenCage Note' in full_loc_df.columns: cols.append('OpenCage Note')
        loc_clean = full_loc_df[cols].drop_duplicates(subset=['location'])
        
        final_hotels = pd.merge(hotels_df, loc_clean, on='location', how='left')
        print(f"‚úÖ Hotels & Location merged. Count: {len(final_hotels)}")
    else:
        print("‚ö†Ô∏è No location files found. Using raw hotels data.")
        final_hotels = hotels_df
else:
    print("‚ùå Hotels data is empty! Check Spark output.")

# ==========================================
# üß† STEP 2: INJECT SENTIMENT (FROM SUMMARY FILE)
# ==========================================
print("\nüß† [Step 2] Injecting Sentiment Scores...")

sentiment_df = pd.DataFrame()

# ŸÖÿ≠ÿßŸàŸÑÿ© ŸÇÿ±ÿßÿ°ÿ© ŸÖŸÑŸÅ ÿßŸÑŸÖŸÑÿÆÿµ ÿßŸÑÿ¨ÿßŸáÿ≤
if os.path.exists(reviews_summary_path):
    print(f"üéâ Found Summary File: {reviews_summary_path}")
    try:
        summary_data = pd.read_csv(reviews_summary_path)
        
        if 'year_int' in summary_data.columns:
            sentiment_df = summary_data.groupby('hotel_name')['sentiment_score'].mean().reset_index()
        else:
            sentiment_df = summary_data[['hotel_name', 'sentiment_score']].copy()
            
        sentiment_df.rename(columns={'hotel_name': 'name', 'sentiment_score': 'ai_score'}, inplace=True)
        print(f"‚úÖ Loaded Sentiment Scores for {len(sentiment_df)} hotels.")
        
    except Exception as e:
        print(f"‚ùå Error reading summary file: {e}")
else:
    # ÿ™ÿ¥ÿÆŸäÿµ ÿßŸÑÿÆÿ∑ÿ£ ŸÑŸà ÿßŸÑŸÖŸÑŸÅ ŸÑÿ≥Ÿá ŸÖÿ¥ ŸÖŸÇÿ±ÿ¶
    print(f"‚ö†Ô∏è Summary file NOT found at: {reviews_summary_path}")
    print("   Debugging: Listing files in /data/reviews/ ...")
    try:
        print(os.listdir('/data/reviews'))
    except Exception as e:
        print(f"   Error checking folder: {e}")

# ==========================================
# üèÅ STEP 3: FINAL MERGE & EXPORT
# ==========================================
print("\nüèÅ [Step 3] Exporting Final Data...")

if not final_hotels.empty:
    final_hotels['name'] = final_hotels['name'].astype(str).str.strip()
    
    if not sentiment_df.empty:
        sentiment_df['name'] = sentiment_df['name'].astype(str).str.strip()
        final_dataset = pd.merge(final_hotels, sentiment_df, on='name', how='left')
        final_dataset['ai_score'] = final_dataset['ai_score'].fillna(4.0)
        final_dataset['ai_score'] = final_dataset['ai_score'].round(1)
    else:
        print("‚ö†Ô∏è Proceeding without custom AI scores (Using default 4.0).")
        final_dataset = final_hotels.copy()
        final_dataset['ai_score'] = 4.0

    final_dataset.to_csv(final_output_path, index=False)
    
    print(f"üéâ SUCCESS! Data saved to: {final_output_path}")
    print("-" * 30)
    print("Sample Data:")
    print(final_dataset[['name', 'ai_score']].head(5))
    print("-" * 30)
    
else:
    print("‚ùå Critical Error: No hotel data available to save.")

üöÄ Starting Final Assembly (Fast Mode using Summary)...

üìç [Step 1] Preparing Hotels & Location Data...
‚úÖ Hotels & Location merged. Count: 574

üß† [Step 2] Injecting Sentiment Scores...
üéâ Found Summary File: /data/reviews/hotel_sentiment_summary.csv
‚úÖ Loaded Sentiment Scores for 239 hotels.

üèÅ [Step 3] Exporting Final Data...
üéâ SUCCESS! Data saved to: /data/final_data.csv
------------------------------
Sample Data:
                             name  ai_score
0                 Hotel Os Poetas       4.7
1                 Villa Margaridi       4.7
2                    Hotel F√°tima       4.1
3    Harbour Inn Design Townhouse       4.8
4  Onyria Quinta da Marinha Hotel       4.7
------------------------------
