<a href="https://colab.research.google.com/github/sireshbabu/DataAnalyst/blob/main/pysparkallconfig.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Install Java 8
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [None]:
!rm -f spark-3.5.1-bin-hadoop3.tgz

In [None]:
# Download and extract Spark 3.5.1
!wget -q http://archive.apache.org/dist/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz
!tar xf spark-3.5.1-bin-hadoop3.tgz

In [None]:
# Install findspark
!pip install -q findspark

In [None]:
# Install the delta-spark Python package
!pip install -q delta-spark==3.2.0

In [None]:
# Set environment variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.1-bin-hadoop3"

In [None]:
import findspark
findspark.init()

In [None]:
from pyspark.sql import SparkSession
from delta import configure_spark_with_delta_pip

# Configure SparkSession with Delta Lake and Hive support
builder = SparkSession.builder \
    .appName("DeltaHiveApp") \
    .master("local[*]") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:3.2.0,org.apache.hadoop:hadoop-aws:3.3.4") \
    .enableHiveSupport() # Enable Hive support

# The `configure_spark_with_delta_pip` utility ensures all Delta dependencies are included
spark = configure_spark_with_delta_pip(builder).getOrCreate()

print("SparkSession created with Delta Lake and Hive support.")


In [None]:
# Create a simple DataFrame
data = [(1, "Apple", 50), (2, "Orange", 75), (3, "Banana", 120)]
columns = ["id", "name", "quantity"]
df = spark.createDataFrame(data, columns)

# Define the path for the Delta table
delta_table_path = "/content/my_delta_table"

In [None]:

# Write the DataFrame to a Delta table
# Use `mode("overwrite")` to handle reruns in the same session
df.write.format("delta").mode("overwrite").save(delta_table_path)
print("DataFrame successfully written to Delta table.")

In [None]:
# Read the Delta table back into a DataFrame
delta_df = spark.read.format("delta").load(delta_table_path)

In [None]:
# Display the contents and schema of the new DataFrame
print("Reading back the Delta table:")
delta_df.show()
delta_df.printSchema()

In [None]:
# Check if the path is a Delta table (requires the `delta` library)
from delta.tables import DeltaTable
is_delta = DeltaTable.isDeltaTable(spark, delta_table_path)
print(f"Is the path a Delta table? {is_delta}")

In [None]:
# Create a new DataFrame for the Hive table
data = [(101, "New York"), (102, "London"), (103, "Tokyo")]
columns = ["emp_id", "city"]
hive_df = spark.createDataFrame(data, columns)

# Define a table name for Hive
hive_table_name = "employee_locations"

# Save the DataFrame as a managed Hive table
hive_df.write.mode("overwrite").saveAsTable(hive_table_name)
print(f"\nDataFrame successfully written to Hive table '{hive_table_name}'.")

# Read the Hive table back into a DataFrame using Spark SQL
hive_read_df = spark.sql(f"SELECT * FROM {hive_table_name}")

# Display the contents and schema of the DataFrame read from Hive
print("Reading back the Hive table:")
hive_read_df.show()
hive_read_df.printSchema()

# Verify that Spark can see the table via Hive
spark.sql("SHOW TABLES").show()

In [None]:
# Create a temporary view from the Delta DataFrame
delta_df.createOrReplaceTempView("delta_view")

# Perform a join operation between the Delta view and Hive table using SQL
print("\nJoining Delta and Hive tables:")
join_df = spark.sql(f"""
    SELECT
        t1.id,
        t1.name,
        t2.city
    FROM delta_view t1
    JOIN {hive_table_name} t2 ON t1.id = t2.emp_id
""")

# Display the joined DataFrame
join_df.show()