In [None]:
# Intialization
import os
import sys

os.environ["SPARK_HOME"] = "/home/talentum/spark"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
# In below two lines, use /usr/bin/python2.7 if you want to use Python 2
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3.6" 
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.7-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

# NOTE: Whichever package you want mention here.
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0 pyspark-shell' 
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-avro_2.11:2.4.0 pyspark-shell'
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.3 pyspark-shell'
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.0 pyspark-shell'

In [None]:
#Entrypoint 2.x
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Spark SQL basic example").enableHiveSupport().getOrCreate()

# On yarn:
# spark = SparkSession.builder.appName("Spark SQL basic example").enableHiveSupport().master("yarn").getOrCreate()
# specify .master("yarn")

sc = spark.sparkContext

In [None]:
# Read CSV file into a Spark DataFrame
df = spark.read.csv("file:///home/talentum/shared/Project/hotel_bookings.csv",header=True,inferSchema=True)

In [None]:
# Show The DataFrame
df.show()

In [None]:
# Print the Schema
df.printSchema()

In [None]:
# Get number of rows
num_rows = df.count()

# Get number of columns
num_columns = len(df.columns)

# Print the shape
print(f"Shape of the DataFrame: ({num_rows}, {num_columns})")

In [None]:
# Show Basic Statistics for each column
for i in df.columns:
    df.select(i).describe().show()

In [None]:
# Print the Unique values and count from each column
for i in df.columns:
    df.groupBy(i).count().show()

In [None]:
# Printing the schema for a single column
df.select("arrival_date_month").printSchema()

In [None]:
df.select("arrival_date_month").show()

In [None]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [None]:
# Converting the month column into numeric month
df = df.withColumn("arrival_date_month",when(col("arrival_date_month") == "January", 1)
                   .when(col("arrival_date_month") == "February",2)
                   .when(col("arrival_date_month") == "March",3)
                   .when(col("arrival_date_month") == "April",4)
                   .when(col("arrival_date_month") == "May",5)
                   .when(col("arrival_date_month") == "June",6)
                   .when(col("arrival_date_month") == "July",7)
                   .when(col("arrival_date_month") == "August",8)
                   .when(col("arrival_date_month") == "September",9)
                   .when(col("arrival_date_month") == "October",10)
                   .when(col("arrival_date_month") == "November",11)
                   .when(col("arrival_date_month") == "December",12)
                   .otherwise("Na"))

In [None]:
df.groupBy("arrival_date_month").count().show()

In [None]:
df.select("arrival_date_month").printSchema()

In [None]:
from pyspark.sql.types import IntegerType

In [None]:
# Changing Datatype of a Column "arrival_date_month"
df = df.withColumn("arrival_date_month", df["arrival_date_month"].cast(IntegerType()))

In [None]:
# Changing Datatype of a Column "children"
df = df.withColumn("children", df["children"].cast(IntegerType()))

In [None]:
df.select("children").printSchema()

In [None]:
# Changing Datatype of a Column "agent"
df = df.withColumn("agent",df["agent"].cast(IntegerType()))

In [None]:
# Changing Datatype of a Column "company"
df = df.withColumn("company",df["company"].cast(IntegerType()))

In [None]:
# Show the null counts
null_counts = {}
for c in df.columns:
    null_count=df.filter(col(c).isNull()).count()
    null_counts[c]=null_count

for column, count in null_counts.items():
    print(f"{column}: {count}")

In [None]:
# Merging "Agent" & "company" column based on distribution channel
from pyspark.sql.functions import col, when, coalesce

df = df.withColumn(
    "Distribution_Id",
    when(col("distribution_channel") == "Direct", coalesce(col("company"), col("agent")))
    .when(col("distribution_channel") == "Corporate", coalesce(col("company"), col("agent")))
    .when(col("distribution_channel") == "TA/TO", coalesce(col("agent"), col("company")))
    .otherwise(0) # Default value for unmatched market_segment cases
)

null_count = df.filter(col('Distribution_Id').isNull()).count()
print(f"Null values in 'Distribution_Id': {null_count}")

In [None]:
# Filling the null values 
from pyspark.sql.functions import col, count
mode_df = df.groupBy("children").agg(count("children").alias("count")).orderBy(col("count").desc()).limit(1)
mode_value = mode_df.collect()[0]["children"]
df = df.fillna({"children": mode_value})

In [None]:
# Filling the null values
from pyspark.sql.functions import col, count
mode_df = df.groupBy("Distribution_Id").agg(count("Distribution_Id").alias("count")).orderBy(col("count").desc()).limit(1)
mode_value = mode_df.collect()[0]["Distribution_Id"]
df = df.fillna({"Distribution_Id": mode_value})

In [None]:
# Again taking the count of null values
null_counts = {}
for c in df.columns:
    null_count=df.filter(col(c).isNull()).count()
    null_counts[c]=null_count

for column, count in null_counts.items():
    print(f"{column}: {count}")

In [None]:
num_rows = df.count()

# Get number of columns
num_columns = len(df.columns)

# Print the shape
print(f"Shape of DataFrame: ({num_rows}, {num_columns})")

In [None]:
# Calculating the correlation between output variable and "reservation status"
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer
from pyspark.sql.functions import corr

# Initialize Spark Session
spark = SparkSession.builder.appName("CorrelationExample").getOrCreate()

# Load the dataset
#data = spark.read.csv("Hotel_Booking_Cleaned_Data.csv", header=True, inferSchema=True)

# Convert 'reservation_status' column to numeric using StringIndexer
indexer = StringIndexer(inputCol='reservation_status', outputCol='reservation_status_index')
data = indexer.fit(df).transform(df)

# Calculate the correlation between 'is_canceled' and 'reservation_status_index'
correlation = data.stat.corr('is_canceled', 'reservation_status_index')
print(f"Correlation between 'is_canceled' and 'reservation_status': {correlation}")


In [None]:
# Dropping the less important columns
df = df.drop("agent","company","market_segment","reservation_status_date")

In [None]:
# Finding the correlation between all the columns
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.stat import Correlation

# Step 1: Create a Spark session
spark = SparkSession.builder.appName("HotelBookingCorrelation").getOrCreate()

# Step 2: Load the CSV data into a DataFrame
#file_path = "/content/Hotel_Booking_Cleaned_Data.csv"
#df = spark.read.csv(file_path, header=True, inferSchema=True)

# Step 3: Select only numeric columns
numeric_columns = [col[0] for col in df.dtypes if col[1] in ('int', 'double')]
df_numeric = df.select(numeric_columns)

# Step 4: Vectorize the numeric columns
assembler = VectorAssembler(inputCols=numeric_columns, outputCol="features")
df_vector = assembler.transform(df_numeric).select("features")

# Step 5: Calculate the correlation matrix
correlation_matrix = Correlation.corr(df_vector, "features").head()[0]

# Step 6: Convert the correlation matrix to a readable format and show it
import numpy as np

matrix = np.array(correlation_matrix.toArray())
matrix_df = spark.createDataFrame(matrix.tolist(), numeric_columns)
matrix_df.show(truncate=False)

In [None]:
df.write.csv("Cleaned_Data.csv")