In [1]:
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("HiveSparkIntegration")
    .config("spark.sql.catalogImplementation", "hive")
    .config("spark.hadoop.hive.metastore.uris", "thrift://hive-metastore:9083")
    .enableHiveSupport()
    .getOrCreate()
)


25/07/27 02:29:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [2]:
spark.sql("Show databases").show()

+-----------+
|  namespace|
+-----------+
|    default|
|explanation|
|    test_db|
+-----------+



In [37]:
# Read data from HDFS
df = spark.read.option("delimiter", "|").option("header", "false").csv("hdfs://namenode:8020/user/admin/data/nation.tbl")


# Show sample data
df.show(5)


+---+---------+---+--------------------+
|_c0|      _c1|_c2|                 _c3|
+---+---------+---+--------------------+
|  0|  ALGERIA|  0| haggle. carefull...|
|  1|ARGENTINA|  1|al foxes promise ...|
|  2|   BRAZIL|  1|y alongside of th...|
|  3|   CANADA|  1|eas hang ironic, ...|
|  4|    EGYPT|  4|y above the caref...|
+---+---------+---+--------------------+
only showing top 5 rows



In [38]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Define schema based on your actual data columns
schema = StructType([
    StructField("id", StringType(), True),
    StructField("country", StringType(), True),
    StructField("region_id", StringType(), True),
    StructField("description", StringType(), True)
 

])

# Reload with schema and delimiter
df = spark.read.schema(schema).option("delimiter", "|").csv("hdfs://namenode:8020/user/admin/data/nation.tbl")
df.show(5)


+---+---------+---------+--------------------+
| id|  country|region_id|         description|
+---+---------+---------+--------------------+
|  0|  ALGERIA|        0| haggle. carefull...|
|  1|ARGENTINA|        1|al foxes promise ...|
|  2|   BRAZIL|        1|y alongside of th...|
|  3|   CANADA|        1|eas hang ironic, ...|
|  4|    EGYPT|        4|y above the caref...|
+---+---------+---------+--------------------+
only showing top 5 rows



In [39]:
df.createOrReplaceTempView("temp_nation")

In [40]:
# Example: Select first 10 rows
result = spark.sql("SELECT * FROM temp_nation LIMIT 10")
result.show()

count = spark.sql("SELECT COUNT(*) as total FROM temp_nation")
count.show()


+---+---------+---------+--------------------+
| id|  country|region_id|         description|
+---+---------+---------+--------------------+
|  0|  ALGERIA|        0| haggle. carefull...|
|  1|ARGENTINA|        1|al foxes promise ...|
|  2|   BRAZIL|        1|y alongside of th...|
|  3|   CANADA|        1|eas hang ironic, ...|
|  4|    EGYPT|        4|y above the caref...|
|  5| ETHIOPIA|        0|ven packages wake...|
|  6|   FRANCE|        3|refully final req...|
|  7|  GERMANY|        3|l platelets. regu...|
|  8|    INDIA|        2|ss excuses cajole...|
|  9|INDONESIA|        2| slyly express as...|
+---+---------+---------+--------------------+

+-----+
|total|
+-----+
|   25|
+-----+



In [41]:
explain_df = spark.sql("EXPLAIN SELECT * FROM temp_nation WHERE id = '1'")

explain_df.show(truncate=False)

+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|plan                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 

In [9]:
df1 = spark.sql("Show databases")
df1.show()

+-----------+
|  namespace|
+-----------+
|    default|
|explanation|
|    test_db|
+-----------+



In [42]:
spark.sql("CREATE TABLE test_db.nation AS SELECT * FROM temp_nation")

DataFrame[]

In [43]:
spark.sql("Select * from test_db.nation")

DataFrame[id: string, country: string, region_id: string, description: string]