In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

In [30]:
username = "052123"

In [2]:
# Spark Dynamic Allocation is off by default but CML Overrides that to true
# Disabling DA for demo purposes. We will use it later.

# Spark AQE is on by default in Spark 3.2+
# Disabling AQE for demo purposes. We will use it later.

spark = SparkSession\
    .builder\
    .appName("PythonSQL")\
    .config("spark.hadoop.fs.s3a.s3guard.ddb.region","us-east-2")\
    .config("spark.yarn.access.hadoopFileSystems","s3a://go01-demo")\
    .config("spark.sql.adaptive.enabled", "false")\
    .config("spark.dynamicAllocation.enabled", "false")\
    .getOrCreate()

Setting spark.hadoop.yarn.resourcemanager.principal to pauldefusco


In [3]:
spark.sparkContext.getConf().getAll()

[('spark.dynamicAllocation.enabled', 'false'),
 ('spark.eventLog.enabled', 'true'),
 ('spark.ui.proxyRedirectUri',
  'https://spark-hgx2s40gc88b8mgh.ml-4c5feac0-3ec.go01-dem.ylcu-atmi.cloudera.site'),
 ('spark.hadoop.fs.s3a.s3guard.ddb.region', 'us-east-2'),
 ('spark.network.crypto.enabled', 'true'),
 ('spark.kubernetes.driver.pod.name', 'hgx2s40gc88b8mgh'),
 ('spark.kerberos.renewal.credentials', 'ccache'),
 ('spark.dynamicAllocation.maxExecutors', '49'),
 ('spark.eventLog.dir', 'file:///sparkeventlogs'),
 ('spark.hadoop.yarn.resourcemanager.principal', 'pauldefusco'),
 ('spark.kubernetes.driver.annotation.cluster-autoscaler.kubernetes.io/safe-to-evict',
  'false'),
 ('spark.ui.port', '20049'),
 ('spark.kubernetes.executor.annotation.cluster-autoscaler.kubernetes.io/safe-to-evict',
  'false'),
 ('spark.driver.memory', '3051m'),
 ('spark.io.encryption.enabled', 'true'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.driver.bindAddress', '100.100.92.161'),
 ('spark.submit.depl

#### Hive Metastore

Spark SQL uses a Hive metastore to manage the metadata of persistent relational entities (e.g. databases, tables, columns, partitions) in a relational database (for fast access). A Hive metastore warehouse (aka spark-warehouse) is the directory where Spark SQL persists tables whereas a Hive metastore (aka metastore_db) is a relational database to manage the metadata of the persistent relational entities, e.g. databases, tables, columns, partitions.

#### Hive Warehouse Connector

HWC is software for securely accessing Hive tables from Spark. You need to use the HWC if you want to access Hive managed tables from Spark. You explicitly use HWC by calling the HiveWarehouseConnector API to write to managed tables. You might use HWC without even realizing it. HWC implicitly reads tables when you run a Spark SQL query on a Hive managed table.

You do not need HWC to read or write Hive external tables. You can use native Spark SQL. Spark tables will be tracked in the HMS.

In this tutorial we will not use the HWC.

In [4]:
# Show catalog and database
spark.sql("SHOW CURRENT NAMESPACE").show()

+-------------+---------+
|      catalog|namespace|
+-------------+---------+
|spark_catalog|  default|
+-------------+---------+



In [5]:
# Create a new database
spark.sql("CREATE DATABASE IF NOT EXISTS spark_catalog.spark")
spark.sql("USE spark_catalog.spark")

Hive Session ID = cbd83497-a3c6-487d-a297-eaf925f12e0e


DataFrame[]

In [6]:
# Show catalog and database
spark.sql("SHOW CURRENT NAMESPACE").show()

+-------------+---------+
|      catalog|namespace|
+-------------+---------+
|spark_catalog|    spark|
+-------------+---------+



#### Non Partitioned Table

In [7]:
spark.sql("DROP TABLE IF EXISTS spark.non_partitioned_table")
spark.sql("CREATE TABLE IF NOT EXISTS spark.non_partitioned_table\
            (id BIGINT, state STRING, country STRING)")

DataFrame[]

In [8]:
spark.sql("INSERT INTO spark.non_partitioned_table VALUES (1, 'CA', 'USA'),(2, 'CA', 'USA'),\
                    (3, 'AZ', 'USA'),\
                    (4, 'ON', 'CAN'),\
                    (5, 'AL', 'CAN')")

                                                                                

DataFrame[]

Hue screenshot here

In [9]:
try:
    spark.sql("SHOW PARTITIONS spark.non_partitioned_table").show()
except:
    print("There are no partitions to be shown")

There are no partitions to be shown


Hue Screenshot here

#### Partitioned Table

In [10]:
spark.sql("DROP TABLE IF EXISTS spark.partitioned_table")
spark.sql("CREATE TABLE IF NOT EXISTS spark.partitioned_table\
    (id BIGINT, state STRING, country STRING)\
    USING PARQUET\
    PARTITIONED BY (country)")

DataFrame[]

In [11]:
spark.sql("INSERT INTO spark.partitioned_table VALUES (1, 'CA', 'USA'),(2, 'CA', 'USA'),\
                    (3, 'AZ', 'USA'),\
                    (4, 'ON', 'CAN'),\
                    (5, 'AL', 'CAN')")

                                                                                

DataFrame[]

Hue screenshot here

In [12]:
spark.sql("SHOW PARTITIONS spark.partitioned_table").show()

+-----------+
|  partition|
+-----------+
|country=CAN|
|country=USA|
+-----------+



### Spark Explain with Non-Partitioned Data

In [13]:
df_notp = spark.sql("SELECT * FROM spark.non_partitioned_table WHERE country='USA'")

In [14]:
df_notp.explain(mode="extended")

== Parsed Logical Plan ==
'Project [*]
+- 'Filter ('country = USA)
   +- 'UnresolvedRelation [spark, non_partitioned_table], [], false

== Analyzed Logical Plan ==
id: bigint, state: string, country: string
Project [id#112L, state#113, country#114]
+- Filter (country#114 = USA)
   +- SubqueryAlias spark_catalog.spark.non_partitioned_table
      +- HiveTableRelation [`spark`.`non_partitioned_table`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#112L, state#113, country#114], Partition Cols: []]

== Optimized Logical Plan ==
Filter (isnotnull(country#114) AND (country#114 = USA))
+- HiveTableRelation [`spark`.`non_partitioned_table`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#112L, state#113, country#114], Partition Cols: []]

== Physical Plan ==
*(1) Filter (isnotnull(country#114) AND (country#114 = USA))
+- Scan hive spark.non_partitioned_table [id#112L, state#113, country#114], HiveTableRelation [`spark`.`non_partitioned_table`, org.apache.

In [15]:
df_notp.explain(mode="codegen")

Found 1 WholeStageCodegen subtrees.
== Subtree 1 / 1 (maxMethodCodeSize:282; maxConstantPoolSize:121(0.18% used); numInnerClasses:0) ==
*(1) Filter (isnotnull(country#114) AND (country#114 = USA))
+- Scan hive spark.non_partitioned_table [id#112L, state#113, country#114], HiveTableRelation [`spark`.`non_partitioned_table`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#112L, state#113, country#114], Partition Cols: []]

Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIteratorForCodegenStage1(references);
/* 003 */ }
/* 004 */
/* 005 */ // codegenStageId=1
/* 006 */ final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 007 */   private Object[] references;
/* 008 */   private scala.collection.Iterator[] inputs;
/* 009 */   private scala.collection.Iterator inputadapter_input_0;
/* 010 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowW

In [16]:
df_notp.explain(mode="cost")

== Optimized Logical Plan ==
Filter (isnotnull(country#114) AND (country#114 = USA)), Statistics(sizeInBytes=45.0 B)
+- HiveTableRelation [`spark`.`non_partitioned_table`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#112L, state#113, country#114], Partition Cols: []], Statistics(sizeInBytes=45.0 B)

== Physical Plan ==
*(1) Filter (isnotnull(country#114) AND (country#114 = USA))
+- Scan hive spark.non_partitioned_table [id#112L, state#113, country#114], HiveTableRelation [`spark`.`non_partitioned_table`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#112L, state#113, country#114], Partition Cols: []]




In [17]:
df_notp.explain(mode="formatted")

== Physical Plan ==
* Filter (2)
+- Scan hive spark.non_partitioned_table (1)


(1) Scan hive spark.non_partitioned_table
Output [3]: [id#112L, state#113, country#114]
Arguments: [id#112L, state#113, country#114], HiveTableRelation [`spark`.`non_partitioned_table`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#112L, state#113, country#114], Partition Cols: []]

(2) Filter [codegen id : 1]
Input [3]: [id#112L, state#113, country#114]
Condition : (isnotnull(country#114) AND (country#114 = USA))




In [18]:
df_notp.write.mode("overwrite").parquet("s3a://go01-demo/lakehouse/nonpar")

                                                                                

Hue screenshot here

### Spark Explain with Partitioned Data 

In [19]:
df_p = spark.sql("SELECT * FROM spark.partitioned_table WHERE country = 'USA'")

In [20]:
df_p.explain(mode="extended")

== Parsed Logical Plan ==
'Project [*]
+- 'Filter ('country = USA)
   +- 'UnresolvedRelation [spark, partitioned_table], [], false

== Analyzed Logical Plan ==
id: bigint, state: string, country: string
Project [id#121L, state#122, country#123]
+- Filter (country#123 = USA)
   +- SubqueryAlias spark_catalog.spark.partitioned_table
      +- Relation spark.partitioned_table[id#121L,state#122,country#123] parquet

== Optimized Logical Plan ==
Filter (isnotnull(country#123) AND (country#123 = USA))
+- Relation spark.partitioned_table[id#121L,state#122,country#123] parquet

== Physical Plan ==
*(1) ColumnarToRow
+- FileScan parquet spark.partitioned_table[id#121L,state#122,country#123] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[s3a://go01-demo/warehouse/tablespace/external/hive/spark.db/partitione..., PartitionFilters: [isnotnull(country#123), (country#123 = USA)], PushedFilters: [], ReadSchema: struct<id:bigint,state:string>



In [21]:
df_p.explain(mode="codegen")

Found 1 WholeStageCodegen subtrees.
== Subtree 1 / 1 (maxMethodCodeSize:324; maxConstantPoolSize:139(0.21% used); numInnerClasses:0) ==
*(1) ColumnarToRow
+- FileScan parquet spark.partitioned_table[id#121L,state#122,country#123] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[s3a://go01-demo/warehouse/tablespace/external/hive/spark.db/partitione..., PartitionFilters: [isnotnull(country#123), (country#123 = USA)], PushedFilters: [], ReadSchema: struct<id:bigint,state:string>

Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIteratorForCodegenStage1(references);
/* 003 */ }
/* 004 */
/* 005 */ // codegenStageId=1
/* 006 */ final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 007 */   private Object[] references;
/* 008 */   private scala.collection.Iterator[] inputs;
/* 009 */   private int columnartorow_batchIdx_0;
/* 010 */   private org

In [22]:
df_p.explain(mode="cost")

== Optimized Logical Plan ==
Filter (isnotnull(country#123) AND (country#123 = USA)), Statistics(sizeInBytes=1398.0 B)
+- Relation spark.partitioned_table[id#121L,state#122,country#123] parquet, Statistics(sizeInBytes=1398.0 B)

== Physical Plan ==
*(1) ColumnarToRow
+- FileScan parquet spark.partitioned_table[id#121L,state#122,country#123] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[s3a://go01-demo/warehouse/tablespace/external/hive/spark.db/partitione..., PartitionFilters: [isnotnull(country#123), (country#123 = USA)], PushedFilters: [], ReadSchema: struct<id:bigint,state:string>




In [23]:
df_p.explain(mode="formatted")

== Physical Plan ==
* ColumnarToRow (2)
+- Scan parquet spark.partitioned_table (1)


(1) Scan parquet spark.partitioned_table
Output [3]: [id#121L, state#122, country#123]
Batched: true
Location: InMemoryFileIndex [s3a://go01-demo/warehouse/tablespace/external/hive/spark.db/partitioned_table/country=USA]
PartitionFilters: [isnotnull(country#123), (country#123 = USA)]
ReadSchema: struct<id:bigint,state:string>

(2) ColumnarToRow [codegen id : 1]
Input [3]: [id#121L, state#122, country#123]




In [24]:
df_p.write.partitionBy("country").mode("overwrite").parquet("s3a://go01-demo/lakehouse/part")

                                                                                

Hue Screenshot here

### Working with More Data

In [25]:
from pyspark.sql.types import LongType, IntegerType, StringType

import dbldatagen as dg

INFO: Version : VersionInfo(major='0', minor='2', patch='1', release='', build='')


In [27]:
def generate_df(row_count = 100000, unique_vals=100000):

    #shuffle_partitions_requested = 8
    
    #spark.conf.set("spark.sql.shuffle.partitions", shuffle_partitions_requested)

    country_codes = [
        "CN", "US", "FR", "CA", "IN", "JM", "IE", "PK", "GB", "IL", "AU", 
        "SG", "ES", "GE", "MX", "ET", "SA", "LB", "NL", "IT"
    ]
    #country_weights = [
    #    1300, 365, 67, 38, 1300, 3, 7, 212, 67, 9, 25, 6, 47, 83, 
    #    126, 109, 58, 8, 17,
    #]

    manufacturers = [
        "Delta corp", "Xyzzy Inc.", "Lakehouse Ltd", "Acme Corp", "Embanks Devices",
    ]

    lines = ["delta", "xyzzy", "lakehouse", "gadget", "droid"]

    testDataSpec = (
        dg.DataGenerator(spark, name="device_data_set", rows=row_count) #,partitions=partitions_num)
        .withIdOutput()
        # we'll use hash of the base field to generate the ids to
        # avoid a simple incrementing sequence
        .withColumn("internal_device_id", "long", minValue=0x1000000000000, 
                    uniqueValues=unique_vals, omit=True, baseColumnType="hash",
        )
        # note for format strings, we must use "%lx" not "%x" as the
        # underlying value is a long
        .withColumn(
            "device_id", "string", format="0x%013x", baseColumn="internal_device_id"
        )
        # the device / user attributes will be the same for the same device id
        # so lets use the internal device id as the base column for these attribute
        .withColumn("country", "string", values=country_codes, #weights=country_weights, 
                    baseColumn="internal_device_id")
        .withColumn("manufacturer", "string", values=manufacturers, 
                    baseColumn="internal_device_id", )
        # use omit = True if you don't want a column to appear in the final output
        # but just want to use it as part of generation of another column
        .withColumn("line", "string", values=lines, baseColumn="manufacturer", 
                    baseColumnType="hash", omit=True )
        .withColumn("model_ser", "integer", minValue=1, maxValue=11, baseColumn="device_id", 
                    baseColumnType="hash", omit=True, )
        .withColumn("model_line", "string", expr="concat(line, '#', model_ser)", 
                    baseColumn=["line", "model_ser"] )
        .withColumn("event_type", "string", 
                    values=["activation", "deactivation", "plan change", "telecoms activity", 
                            "internet activity", "device error", ],
                    random=True)
        .withColumn("event_ts", "timestamp", begin="2020-01-01 01:00:00", 
                    end="2020-12-31 23:59:00", 
                    interval="1 minute", random=True )
    )

    dfTestData = testDataSpec.build()

    display(dfTestData)
    
    return dfTestData

When you are running Spark application in yarn or any cluster manager, the default length/size of partitions RDD/DataFrame/Dataset are created with the total number of cores on all executor nodes. 

Spark automatically sets the number of “map” tasks to run on each file according to its size (though you can control it through optional parameters to SparkContext.textFile, etc), and for distributed “reduce” operations, such as groupByKey and reduceByKey, it uses the largest parent RDD’s number of partitions.

In [28]:
df = generate_df(row_count = 100000, unique_vals=100000)

DataFrame[id: bigint, device_id: string, country: string, manufacturer: string, model_line: string, event_type: string, event_ts: timestamp]

In [29]:
df.rdd.getNumPartitions()

10

In [31]:
df.write.mode("overwrite").saveAsTable('SPARK.IOT_DATA_{}'.format(username), format="parquet") #partitionBy()

                                                                                

In [35]:
spark.conf.get("spark.sql.warehouse.dir")

's3a://go01-demo/warehouse/tablespace/external/hive'

In [41]:
spark.conf.get("spark.kubernetes.container.image")

'docker.repository.cloudera.com/cloudera/cdsw/ml-runtime-jupyterlab-python3.7-standard:2022.11.1-b2'

In [48]:
spark.conf.get("spark.driver.cores")

Py4JJavaError: An error occurred while calling o251.get.
: java.util.NoSuchElementException: spark.driver.cores
	at org.apache.spark.sql.errors.QueryExecutionErrors$.noSuchElementExceptionError(QueryExecutionErrors.scala:1494)
	at org.apache.spark.sql.internal.SQLConf.$anonfun$getConfString$3(SQLConf.scala:4188)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.internal.SQLConf.getConfString(SQLConf.scala:4188)
	at org.apache.spark.sql.RuntimeConfig.get(RuntimeConfig.scala:72)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)


Turn AQE and DA off. Run spark jobs.

Turn DA On and do section on it

Turn AQE On and do section on it

#### Spark Challenges

Spark Merge Into Workaround

Spark Schema Evolution Workaround

Spark Partition Evolution Workaround