In [1]:
from pyspark.sql import SparkSession

In [2]:
%load_ext sparksql_magic

#### Following code is used to initilize the Spark Session. Delta lake package is used while creating the spark session, which will help to save spark dataframe as Delta Table. Unity Catalog is used as default catalog instead of Delta Catalog and Minio as Object Storage
- This code is commented since Jupyter is started with Pyspark session using the same configutation mentioned here. The Pyspark Jupyter script can be found in docker entryfile of unity catalog present in folder **"install-and-config\2-spark-delta-unity-minio-integration"**..
  

In [12]:
# # Initialize SparkSession
# # Use latest jars for delta lake. 
spark = SparkSession.builder \
    .appName("add-delta-lake_1") \
    .master("spark://spark-master:7077") \
    .config("spark.jars.packages", "io.delta:delta-spark_2.12:3.3.0,org.apache.hadoop:hadoop-aws:3.3.4")\
    .config("spark.sql.extensions","io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")\
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
    .config("spark.hadoop.fs.s3a.access.key", "root") \
    .config("spark.hadoop.fs.s3a.secret.key", "jerinminioserver") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.sql.warehouse.dir", "s3a://delta-bucket/delta-lake/data") \
    .getOrCreate()

In [11]:
# To stop SparkSession
spark.stop()

In [85]:
# Get spark session config details
# spark.sparkContext.getConf().getAll()

[('spark.eventLog.enabled', 'true'),
 ('spark.jars',
  'file:///root/.ivy2/jars/io.delta_delta-spark_2.12-3.2.0.jar,file:///root/.ivy2/jars/io.unitycatalog_unitycatalog-spark_2.12-0.2.0.jar,file:///root/.ivy2/jars/io.delta_delta-storage-3.2.0.jar,file:///root/.ivy2/jars/org.antlr_antlr4-runtime-4.9.3.jar,file:///root/.ivy2/jars/io.unitycatalog_unitycatalog-client-0.2.0.jar,file:///root/.ivy2/jars/org.slf4j_slf4j-api-2.0.13.jar,file:///root/.ivy2/jars/org.apache.logging.log4j_log4j-slf4j2-impl-2.23.1.jar,file:///root/.ivy2/jars/org.apache.logging.log4j_log4j-api-2.23.1.jar,file:///root/.ivy2/jars/com.fasterxml.jackson.core_jackson-databind-2.15.0.jar,file:///root/.ivy2/jars/com.fasterxml.jackson.module_jackson-module-scala_2.12-2.15.0.jar,file:///root/.ivy2/jars/com.fasterxml.jackson.core_jackson-annotations-2.15.0.jar,file:///root/.ivy2/jars/com.fasterxml.jackson.core_jackson-core-2.15.0.jar,file:///root/.ivy2/jars/com.fasterxml.jackson.dataformat_jackson-dataformat-xml-2.15.0.jar,file

In [13]:
# Example DataFrame creation
data = [("Jerin", 29), ("Aayush", 35), ("Neeraj", 28)]
columns = ["Name", "Age"]
df = spark.createDataFrame(data, columns)

In [14]:
df.show()

[Stage 0:>                                                          (0 + 1) / 1]

+------+---+
|  Name|Age|
+------+---+
| Jerin| 29|
|Aayush| 35|
|Neeraj| 28|
+------+---+



                                                                                

In [7]:
spark.sql("CREATE SCHEMA demotest;").show()

++
||
++
++



In [16]:
# unmanaged delta table - Save to minio
df.write.format("delta").save("s3a://ucbucket/external_tables/testUnmanagedTableOnMinionewTest")

                                                                                

In [17]:
# Read delta table from minio
spark.sql("""select * from delta.`s3a://ucbucket/external_tables/testUnmanagedTableOnMinionewTest`;""").show()

                                                                                

+------+---+
|  Name|Age|
+------+---+
|Aayush| 35|
|Neeraj| 28|
| Jerin| 29|
+------+---+



In [13]:
%%sparksql
SHOW CATALOGS;

0
catalog
spark_catalog
unity


In [14]:
%%sparksql
SHOW SCHEMAS IN unity;

0
namespace
default
demo


In [10]:
%%sparksql
SHOW TABLES IN unity.default;

0,1,2
namespace,tableName,isTemporary
default,marksheet,False
default,marksheet_uniform,False
default,numbers,False
default,testUnityCatalogOnMinio,False
default,testUnityCatalogOnMinio123456,False
default,user_countries,False


In [16]:
%%sparksql
USE unity.default

In [17]:
%%sparksql

SELECT current_schema()

0
current_database()
default


In [9]:
# create bucket ("ucbucket") in Minio, if not exists.
# The following code will cerate the folders in Minio and will also register the Table Name in Unity Catalog. 
# However the storage location (i.e. Minio Location) in property section of registered table in Unity Catalog is incorrect as local file system directory name is added as prefix in minio location .
spark.sql("""
CREATE TABLE unity.demotest.testUnityCatalogOnMinionewing (
    id INT, 
    desc STRING
) 
USING delta 
LOCATION 's3a://ucbucket/minio_data/tables/testUnityCatalogOnMinionewing';""")

25/01/20 07:57:10 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties


DataFrame[]

In [16]:
%%sparksql

CREATE TABLE unity.demotest.testUnityCatalogOnMinionewing123 (
    id INT, 
    desc STRING);

Py4JJavaError: An error occurred while calling o60.sql.
: java.lang.AssertionError: assertion failed
	at scala.Predef$.assert(Predef.scala:208)
	at io.unitycatalog.spark.UCProxy.createTable(UCSingleCatalog.scala:299)
	at org.apache.spark.sql.connector.catalog.DelegatingCatalogExtension.createTable(DelegatingCatalogExtension.java:102)
	at org.apache.spark.sql.delta.catalog.DeltaCatalog.createCatalogTable(DeltaCatalog.scala:328)
	at org.apache.spark.sql.delta.catalog.DeltaCatalog.$anonfun$createTable$1(DeltaCatalog.scala:373)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:171)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:169)
	at org.apache.spark.sql.delta.catalog.DeltaCatalog.recordFrameProfile(DeltaCatalog.scala:67)
	at org.apache.spark.sql.delta.catalog.DeltaCatalog.createTable(DeltaCatalog.scala:350)
	at org.apache.spark.sql.delta.catalog.DeltaCatalog.createTable(DeltaCatalog.scala:341)
	at io.unitycatalog.spark.UCSingleCatalog.createTable(UCSingleCatalog.scala:101)
	at org.apache.spark.sql.execution.datasources.v2.CreateTableExec.run(CreateTableExec.scala:44)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.Dataset.<init>(Dataset.scala:220)
	at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
	at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:638)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:629)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:659)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:569)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:840)


In [19]:
# Since the location is incorrect in Unity Catalog, therefore the below code will throw error.
spark.sql("""
Insert into unity.demotest.testUnityCatalogOnMinionewing Values (1,'Jerin'),(2,'Neeraj'), (3,'Atul'),(4,'Priti'),(5,'Rahul'),(6,'Chinmay'),(7,'Ashu');
""")

AnalysisException: [DELTA_TABLE_NOT_FOUND] Delta table `demotest`.`testUnityCatalogOnMinionewing` doesn't exist.;
AppendData RelationV2[] unity.demotest.testUnityCatalogOnMinionewing unity.demotest.testUnityCatalogOnMinionewing, false
+- LocalRelation [col1#1980, col2#1981]


In [25]:
# Since the location is incorrect in Unity Catalog, therefore the below code will throw error.
%%sparksql

Select * from unity.default.testUnityCatalogOnMinio123456;

AnalysisException: [DELTA_TABLE_NOT_FOUND] Delta table `default`.`testUnityCatalogOnMinio123456` doesn't exist.