## Spark write Hive table

In [3]:
!pip install pyspark==3.4.1 findspark



In [1]:
import pyspark
import findspark
import os
import sys
from pyspark.sql import SparkSession

findspark.init()

# spark_home = os.getenv('SPARK_HOME')
# gravitino_connector_jar = os.getenv('SPARK_CONNECTOR_JAR')
os.environ['HADOOP_USER_NAME']="hive"

spark = None

try:
    spark = SparkSession.builder \
        .appName("PySpark SQL Example") \
        .master("spark://localhost:17077") \
        .config("spark.plugins", "org.apache.gravitino.spark.connector.plugin.GravitinoSparkPlugin") \
        .config("spark.jars", "../init/spark/packages/iceberg-spark-runtime-3.4_2.12-1.5.2.jar,../init/spark/packages/gravitino-spark-connector-runtime-3.4_2.12-0.9.1.jar") \
        .config("spark.sql.gravitino.uri", "http://localhost:8090") \
        .config("spark.sql.gravitino.metalake", "metalake_demo") \
        .config("spark.sql.gravitino.enableIcebergSupport", "true") \
        .config("spark.sql.catalog.catalog_rest", "org.apache.iceberg.spark.SparkCatalog") \
        .config("spark.sql.catalog.catalog_rest.type", "rest") \
        .config("spark.sql.catalog.catalog_rest.uri", "http://localhost:9001/iceberg/") \
        .config("spark.locality.wait.node", "0") \
        .config("spark.sql.warehouse.dir", "hdfs://hive:9000/user/hive/warehouse") \
        .enableHiveSupport() \
        .getOrCreate()
    
    print("SparkSession khởi tạo thành công!")

except Exception as e:
    print("Error when create SparkSession:", e, file=sys.stderr)
    if spark is not None:
        spark.stop()
    sys.exit(1)

25/08/21 04:47:22 WARN Utils: Your hostname, LAPTOP-BMR4GNMS resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/08/21 04:47:22 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
25/08/21 04:47:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


SparkSession khởi tạo thành công!


In [2]:
spark.sql("use catalog_hive")
spark.sql("show databases").show()

25/08/21 04:47:57 WARN FileSystem: Failed to initialize fileystem hdfs://hive:9000/user/hive/warehouse: java.lang.IllegalArgumentException: java.net.UnknownHostException: hive
25/08/21 04:47:57 WARN SharedState: Cannot qualify the warehouse path, leaving it unqualified.
java.lang.IllegalArgumentException: java.net.UnknownHostException: hive
	at org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:466)
	at org.apache.hadoop.hdfs.NameNodeProxiesClient.createProxyWithClientProtocol(NameNodeProxiesClient.java:134)
	at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:374)
	at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:308)
	at org.apache.hadoop.hdfs.DistributedFileSystem.initDFSClient(DistributedFileSystem.java:202)
	at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:187)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3469)
	at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
	at

AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient

In [None]:
spark.sql("CREATE DATABASE IF NOT EXISTS product;")
spark.sql("USE product;")
spark.sql("CREATE TABLE IF NOT EXISTS employees (id INT, name STRING, age INT) PARTITIONED BY (department STRING) STORED AS PARQUET;")
spark.sql("DESC TABLE EXTENDED employees;").show()

In [None]:
spark.sql("INSERT OVERWRITE TABLE employees PARTITION(department='Engineering') VALUES (1, 'John Doe', 30), (2, 'Jane Smith', 28);")
spark.sql("INSERT OVERWRITE TABLE employees PARTITION(department='Marketing') VALUES (3, 'Mike Brown', 32);")
spark.sql("SELECT * from employees").show()

## Query the table with Trino

In [None]:
%pip install trino

In [None]:
from trino.dbapi import connect

# Create a Trino connector client
conn = connect(
    host="trino",
    port=8080,
    user="admin",
    catalog="catalog_hive",
    schema="http",
)

trino_client = conn.cursor()

In [None]:
print(trino_client.execute("SELECT * FROM catalog_hive.product.employees WHERE department = 'Engineering'").fetchall())

## Spark write data with Iceberg REST service

In [None]:
spark.sql("use catalog_rest;")
spark.sql("create database if not exists sales;")
spark.sql("use sales;")
spark.sql("create table customers (customer_id int, customer_name varchar(100), customer_email varchar(100));")


In [None]:
spark.sql("insert into customers (customer_id, customer_name, customer_email) values (11,'Rory Brown','rory@123.com');")
spark.sql("insert into customers (customer_id, customer_name, customer_email) values (12,'Jerry Washington','jerry@dt.com');")
spark.sql("select * from customers").show()

## Trino do federation query data with Hive and Iceberg

In [None]:
print(trino_client.execute("select * from catalog_hive.sales.customers union select * from catalog_iceberg.sales.customers").fetchall())