### Set up the Spark environment.

In [5]:
import subprocess
result = subprocess.run(["gcloud", "config", "get-value", "project"], stdout=subprocess.PIPE)
PROJECT_ID = result.stdout.decode("utf-8").strip()

# Print the output
print(PROJECT_ID)

qwiklabs-gcp-02-39d418f1eb5a


In [6]:
SOURCE_PATH = f'gs://{PROJECT_ID}-data/datasets'
DEST_PATH = f'gs://{PROJECT_ID}-output'
print(SOURCE_PATH, DEST_PATH)

gs://qwiklabs-gcp-02-39d418f1eb5a-data/datasets gs://qwiklabs-gcp-02-39d418f1eb5a-output


In [9]:
orders = spark.read.format('avro').load(f'{SOURCE_PATH}/northwind/AVRO/orders')
orders.show()


DataFrame[orderid: int, customerid: string, employeeid: int, orderdate: string, requireddate: string, shippeddate: string, shipvia: int, freight: double, shipname: string, shipaddress: string, shipcity: string, shipregion: string, shippostalcode: string, shipcountry: string]

In [17]:
from pyspark.sql.functions import current_timestamp

# Assuming your data has a column named 'order_date' that can be used for timestamps
orders = orders.withColumn('ts', current_timestamp().alias('ts'))

In [18]:
orders.show()

+-------+----------+----------+----------+------------+-----------+-------+-------+--------------------+--------------------+--------------+----------+--------------+-----------+--------------------+
|orderid|customerid|employeeid| orderdate|requireddate|shippeddate|shipvia|freight|            shipname|         shipaddress|      shipcity|shipregion|shippostalcode|shipcountry|                  ts|
+-------+----------+----------+----------+------------+-----------+-------+-------+--------------------+--------------------+--------------+----------+--------------+-----------+--------------------+
|  10248|     VINET|         5|1996-07-04|  1996-08-01| 1996-07-16|      3|  32.38|Vins et alcools C...|  59 rue de l'Abbaye|         Reims|      NULL|         51100|     France|2025-01-22 04:13:...|
|  10249|     TOMSP|         6|1996-07-05|  1996-08-16| 1996-07-10|      1|  11.61|  Toms Spezialitäten|       Luisenstr. 48|       Münster|      NULL|         44087|    Germany|2025-01-22 04:13:...|


## Spark DataFrames can be saved to a Hive table using either the saveAsTable method

In [19]:
# Hudi configurations
hudi_table_path = f"{DEST_PATH}/hudi/orders"
hudi_options = {
    'hoodie.table.name': 'orders'
    ,'hoodie.datasource.write.recordkey.field': 'orderid' # Primary key
    ,'hoodie.datasource.write.partitionpath.field': ''
    ,'hoodie.datasource.write.table.type': 'COPY_ON_WRITE'
    ,'hoodie.datasource.write.operation': 'upsert'  # Use 'insert' for initial load
    # ,'hoodie.deltastreamer.schemaprovider.source.schema': orders.schema.json() # Important for schema evolution with deltastreamer
}

# Write the DataFrame to Hudi
orders.write.format("hudi").options(**hudi_options).mode("overwrite").save(hudi_table_path)


25/01/22 04:13:24 WARN HoodieSparkSqlWriterInternal: hoodie table at gs://qwiklabs-gcp-02-39d418f1eb5a-output/hudi/orders already exists. Deleting existing data & overwriting with new data.
25/01/22 04:13:49 WARN HoodieSparkSqlWriterInternal: Closing write client       


In [25]:
%load_ext sparksql_magic

In [30]:
%%sparksql
show tables

0,1,2
namespace,tableName,isTemporary
default,orders,False


In [58]:
# Hudi configurations
hudi_table_name = 'orders'
hudi_table_path = f"{DEST_PATH}/hudi/orders"
hudi_options = {
    'hoodie.table.name': 'orders'
    ,'hoodie.datasource.write.recordkey.field': 'orderid' # Primary key
    ,'hoodie.datasource.write.partitionpath.field': ''
    ,'hoodie.datasource.write.table.type': 'COPY_ON_WRITE'
    ,'hoodie.datasource.write.operation': 'upsert'  # Use 'insert' for initial load
    # ,'hoodie.deltastreamer.schemaprovider.source.schema': orders.schema.json() # Important for schema evolution with deltastreamer
}

# Write the DataFrame to HudiTable
orders.write.format("hudi").options(**hudi_options).mode("overwrite").saveAsTable(hudi_table_name)


25/01/22 04:46:38 WARN TableSchemaResolver: Could not find any data file written for commit, so could not get schema for table gs://gcs-bucket-dataproc-metastore-5bd62580-6667-45d7-a4b8-a53880bd1/hive-warehouse/orders
25/01/22 04:46:38 WARN HoodieSparkSqlWriterInternal: hoodie table at gs://gcs-bucket-dataproc-metastore-5bd62580-6667-45d7-a4b8-a53880bd1/hive-warehouse/orders already exists. Deleting existing data & overwriting with new data.
                                                                                

In [59]:
%%sparksql
select * from orders limit 10;


                                                                                

0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19
_hoodie_commit_time,_hoodie_commit_seqno,_hoodie_record_key,_hoodie_partition_path,_hoodie_file_name,orderid,customerid,employeeid,orderdate,requireddate,shippeddate,shipvia,freight,shipname,shipaddress,shipcity,shipregion,shippostalcode,shipcountry,ts
20250122044640228,20250122044640228_0_2491,10248,,bdf3465d-dbc2-489a-bb93-31c2b70cd628-0_0-288-1082_20250122044640228.parquet,10248,VINET,5,1996-07-04,1996-08-01,1996-07-16,3,32.38,Vins et alcools Chevalier,59 rue de l'Abbaye,Reims,,51100,France,2025-01-22 04:46:48.526051
20250122044640228,20250122044640228_0_2492,10249,,bdf3465d-dbc2-489a-bb93-31c2b70cd628-0_0-288-1082_20250122044640228.parquet,10249,TOMSP,6,1996-07-05,1996-08-16,1996-07-10,1,11.61,Toms Spezialitäten,Luisenstr. 48,Münster,,44087,Germany,2025-01-22 04:46:48.526051
20250122044640228,20250122044640228_0_2493,10250,,bdf3465d-dbc2-489a-bb93-31c2b70cd628-0_0-288-1082_20250122044640228.parquet,10250,HANAR,4,1996-07-08,1996-08-05,1996-07-12,2,65.83,Hanari Carnes,"Rua do Paço, 67",Rio de Janeiro,RJ,05454-876,Brazil,2025-01-22 04:46:48.526051
20250122044640228,20250122044640228_0_2494,10251,,bdf3465d-dbc2-489a-bb93-31c2b70cd628-0_0-288-1082_20250122044640228.parquet,10251,VICTE,3,1996-07-08,1996-08-05,1996-07-15,1,41.34,Victuailles en stock,"2, rue du Commerce",Lyon,,69004,France,2025-01-22 04:46:48.526051
20250122044640228,20250122044640228_0_2495,10252,,bdf3465d-dbc2-489a-bb93-31c2b70cd628-0_0-288-1082_20250122044640228.parquet,10252,SUPRD,4,1996-07-09,1996-08-06,1996-07-11,2,51.3,Suprêmes délices,"Boulevard Tirou, 255",Charleroi,,B-6000,Belgium,2025-01-22 04:46:48.526051
20250122044640228,20250122044640228_0_2496,10253,,bdf3465d-dbc2-489a-bb93-31c2b70cd628-0_0-288-1082_20250122044640228.parquet,10253,HANAR,3,1996-07-10,1996-07-24,1996-07-16,2,58.17,Hanari Carnes,"Rua do Paço, 67",Rio de Janeiro,RJ,05454-876,Brazil,2025-01-22 04:46:48.526051
20250122044640228,20250122044640228_0_2497,10254,,bdf3465d-dbc2-489a-bb93-31c2b70cd628-0_0-288-1082_20250122044640228.parquet,10254,CHOPS,5,1996-07-11,1996-08-08,1996-07-23,2,22.98,Chop-suey Chinese,Hauptstr. 31,Bern,,3012,Switzerland,2025-01-22 04:46:48.526051
20250122044640228,20250122044640228_0_2498,10255,,bdf3465d-dbc2-489a-bb93-31c2b70cd628-0_0-288-1082_20250122044640228.parquet,10255,RICSU,9,1996-07-12,1996-08-09,1996-07-15,3,148.33,Richter Supermarkt,Starenweg 5,Genève,,1204,Switzerland,2025-01-22 04:46:48.526051
20250122044640228,20250122044640228_0_2499,10256,,bdf3465d-dbc2-489a-bb93-31c2b70cd628-0_0-288-1082_20250122044640228.parquet,10256,WELLI,3,1996-07-15,1996-08-12,1996-07-17,2,13.97,Wellington Importadora,"Rua do Mercado, 12",Resende,SP,08737-363,Brazil,2025-01-22 04:46:48.526051


In [33]:
%%sparksql
update orders set shipcountry='Spain' where orderid=10248;


25/01/22 04:21:39 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
25/01/22 04:22:02 WARN HoodieSparkSqlWriterInternal: Closing write client       


In [34]:
%%sparksql
select * from orders limit 10;


0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19
_hoodie_commit_time,_hoodie_commit_seqno,_hoodie_record_key,_hoodie_partition_path,_hoodie_file_name,orderid,customerid,employeeid,orderdate,requireddate,shippeddate,shipvia,freight,shipname,shipaddress,shipcity,shipregion,shippostalcode,shipcountry,ts
20250122042139615,20250122042139615_0_0,10248,,0ef2c041-9652-46a8-8b55-2331f8a691ae-0_0-128-320_20250122042139615.parquet,10248,VINET,5,1996-07-04,1996-08-01,1996-07-16,3,32.38,Vins et alcools Chevalier,59 rue de l'Abbaye,Reims,,51100,Spain,2025-01-22 04:18:57.359962
20250122041848647,20250122041848647_0_832,10249,,0ef2c041-9652-46a8-8b55-2331f8a691ae-0_0-128-320_20250122042139615.parquet,10249,TOMSP,6,1996-07-05,1996-08-16,1996-07-10,1,11.61,Toms Spezialitäten,Luisenstr. 48,Münster,,44087,Germany,2025-01-22 04:18:57.359962
20250122041848647,20250122041848647_0_833,10250,,0ef2c041-9652-46a8-8b55-2331f8a691ae-0_0-128-320_20250122042139615.parquet,10250,HANAR,4,1996-07-08,1996-08-05,1996-07-12,2,65.83,Hanari Carnes,"Rua do Paço, 67",Rio de Janeiro,RJ,05454-876,Brazil,2025-01-22 04:18:57.359962
20250122041848647,20250122041848647_0_834,10251,,0ef2c041-9652-46a8-8b55-2331f8a691ae-0_0-128-320_20250122042139615.parquet,10251,VICTE,3,1996-07-08,1996-08-05,1996-07-15,1,41.34,Victuailles en stock,"2, rue du Commerce",Lyon,,69004,France,2025-01-22 04:18:57.359962
20250122041848647,20250122041848647_0_835,10252,,0ef2c041-9652-46a8-8b55-2331f8a691ae-0_0-128-320_20250122042139615.parquet,10252,SUPRD,4,1996-07-09,1996-08-06,1996-07-11,2,51.3,Suprêmes délices,"Boulevard Tirou, 255",Charleroi,,B-6000,Belgium,2025-01-22 04:18:57.359962
20250122041848647,20250122041848647_0_836,10253,,0ef2c041-9652-46a8-8b55-2331f8a691ae-0_0-128-320_20250122042139615.parquet,10253,HANAR,3,1996-07-10,1996-07-24,1996-07-16,2,58.17,Hanari Carnes,"Rua do Paço, 67",Rio de Janeiro,RJ,05454-876,Brazil,2025-01-22 04:18:57.359962
20250122041848647,20250122041848647_0_837,10254,,0ef2c041-9652-46a8-8b55-2331f8a691ae-0_0-128-320_20250122042139615.parquet,10254,CHOPS,5,1996-07-11,1996-08-08,1996-07-23,2,22.98,Chop-suey Chinese,Hauptstr. 31,Bern,,3012,Switzerland,2025-01-22 04:18:57.359962
20250122041848647,20250122041848647_0_838,10255,,0ef2c041-9652-46a8-8b55-2331f8a691ae-0_0-128-320_20250122042139615.parquet,10255,RICSU,9,1996-07-12,1996-08-09,1996-07-15,3,148.33,Richter Supermarkt,Starenweg 5,Genève,,1204,Switzerland,2025-01-22 04:18:57.359962
20250122041848647,20250122041848647_0_839,10256,,0ef2c041-9652-46a8-8b55-2331f8a691ae-0_0-128-320_20250122042139615.parquet,10256,WELLI,3,1996-07-15,1996-08-12,1996-07-17,2,13.97,Wellington Importadora,"Rua do Mercado, 12",Resende,SP,08737-363,Brazil,2025-01-22 04:18:57.359962


In [44]:
# Hudi configurations
hudi_table_name = 'orders2'
hudi_options = {
    'hoodie.table.name': 'orders'
    ,'hoodie.datasource.write.recordkey.field': 'orderid' # Primary key
    ,'hoodie.datasource.write.partitionpath.field': ''
    ,'hoodie.datasource.write.table.type': 'MERGE_ON_READ'
    ,'hoodie.datasource.write.operation': 'upsert'  # Use 'insert' for initial load
    ,'hoodie.compact.inline': 'false'  # Disable inline compaction for MoR
    ,'hoodie.compact.async.enable': 'true'  # Enable asynchronous compaction
    ,'hoodie.cleaner.policy': 'KEEP_LATEST_FILE_VERSIONS'  # Data retention policy
    ,'hoodie.cleaner.fileversions.retained': '2'  # Number of versions to keep
    # ,'hoodie.deltastreamer.schemaprovider.source.schema': orders.schema.json() # Important for schema evolution with deltastreamer
}

# Write the DataFrame to HudiTable
orders.write.format("hudi").options(**hudi_options).mode("overwrite").saveAsTable(hudi_table_name)


25/01/22 04:35:44 WARN TableSchemaResolver: Could not find any data file written for commit, so could not get schema for table gs://gcs-bucket-dataproc-metastore-5bd62580-6667-45d7-a4b8-a53880bd1/hive-warehouse/orders2
25/01/22 04:35:44 WARN HoodieSparkSqlWriterInternal: hoodie table at gs://gcs-bucket-dataproc-metastore-5bd62580-6667-45d7-a4b8-a53880bd1/hive-warehouse/orders2 already exists. Deleting existing data & overwriting with new data.
                                                                                

In [45]:
%%sparksql
select * from orders limit 10;


                                                                                

0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19
_hoodie_commit_time,_hoodie_commit_seqno,_hoodie_record_key,_hoodie_partition_path,_hoodie_file_name,orderid,customerid,employeeid,orderdate,requireddate,shippeddate,shipvia,freight,shipname,shipaddress,shipcity,shipregion,shippostalcode,shipcountry,ts
20250122042813110,20250122042813110_0_0,10248,,0ef2c041-9652-46a8-8b55-2331f8a691ae-0_0-213-806_20250122042813110.parquet,10248,VINET,5,1996-07-04,1996-08-01,1996-07-16,3,32.38,Vins et alcools Chevalier,59 rue de l'Abbaye,Reims,,51100,Spain,2025-01-22 04:18:57.359962
20250122041848647,20250122041848647_0_832,10249,,0ef2c041-9652-46a8-8b55-2331f8a691ae-0_0-213-806_20250122042813110.parquet,10249,TOMSP,6,1996-07-05,1996-08-16,1996-07-10,1,11.61,Toms Spezialitäten,Luisenstr. 48,Münster,,44087,Germany,2025-01-22 04:18:57.359962
20250122041848647,20250122041848647_0_833,10250,,0ef2c041-9652-46a8-8b55-2331f8a691ae-0_0-213-806_20250122042813110.parquet,10250,HANAR,4,1996-07-08,1996-08-05,1996-07-12,2,65.83,Hanari Carnes,"Rua do Paço, 67",Rio de Janeiro,RJ,05454-876,Brazil,2025-01-22 04:18:57.359962
20250122041848647,20250122041848647_0_834,10251,,0ef2c041-9652-46a8-8b55-2331f8a691ae-0_0-213-806_20250122042813110.parquet,10251,VICTE,3,1996-07-08,1996-08-05,1996-07-15,1,41.34,Victuailles en stock,"2, rue du Commerce",Lyon,,69004,France,2025-01-22 04:18:57.359962
20250122041848647,20250122041848647_0_835,10252,,0ef2c041-9652-46a8-8b55-2331f8a691ae-0_0-213-806_20250122042813110.parquet,10252,SUPRD,4,1996-07-09,1996-08-06,1996-07-11,2,51.3,Suprêmes délices,"Boulevard Tirou, 255",Charleroi,,B-6000,Belgium,2025-01-22 04:18:57.359962
20250122041848647,20250122041848647_0_836,10253,,0ef2c041-9652-46a8-8b55-2331f8a691ae-0_0-213-806_20250122042813110.parquet,10253,HANAR,3,1996-07-10,1996-07-24,1996-07-16,2,58.17,Hanari Carnes,"Rua do Paço, 67",Rio de Janeiro,RJ,05454-876,Brazil,2025-01-22 04:18:57.359962
20250122041848647,20250122041848647_0_837,10254,,0ef2c041-9652-46a8-8b55-2331f8a691ae-0_0-213-806_20250122042813110.parquet,10254,CHOPS,5,1996-07-11,1996-08-08,1996-07-23,2,22.98,Chop-suey Chinese,Hauptstr. 31,Bern,,3012,Switzerland,2025-01-22 04:18:57.359962
20250122041848647,20250122041848647_0_838,10255,,0ef2c041-9652-46a8-8b55-2331f8a691ae-0_0-213-806_20250122042813110.parquet,10255,RICSU,9,1996-07-12,1996-08-09,1996-07-15,3,148.33,Richter Supermarkt,Starenweg 5,Genève,,1204,Switzerland,2025-01-22 04:18:57.359962
20250122041848647,20250122041848647_0_839,10256,,0ef2c041-9652-46a8-8b55-2331f8a691ae-0_0-213-806_20250122042813110.parquet,10256,WELLI,3,1996-07-15,1996-08-12,1996-07-17,2,13.97,Wellington Importadora,"Rua do Mercado, 12",Resende,SP,08737-363,Brazil,2025-01-22 04:18:57.359962


In [62]:
%%sparksql
update orders set shipcountry='Spain' where orderid=10248;


25/01/22 04:52:33 WARN HoodieSparkSqlWriterInternal: Closing write client       


In [47]:
%%sparksql
select * from orders limit 10;


0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19
_hoodie_commit_time,_hoodie_commit_seqno,_hoodie_record_key,_hoodie_partition_path,_hoodie_file_name,orderid,customerid,employeeid,orderdate,requireddate,shippeddate,shipvia,freight,shipname,shipaddress,shipcity,shipregion,shippostalcode,shipcountry,ts
20250122043641024,20250122043641024_0_0,10248,,0ef2c041-9652-46a8-8b55-2331f8a691ae-0_0-257-1051_20250122043641024.parquet,10248,VINET,5,1996-07-04,1996-08-01,1996-07-16,3,32.38,Vins et alcools Chevalier,59 rue de l'Abbaye,Reims,,51100,Spain,2025-01-22 04:18:57.359962
20250122041848647,20250122041848647_0_832,10249,,0ef2c041-9652-46a8-8b55-2331f8a691ae-0_0-257-1051_20250122043641024.parquet,10249,TOMSP,6,1996-07-05,1996-08-16,1996-07-10,1,11.61,Toms Spezialitäten,Luisenstr. 48,Münster,,44087,Germany,2025-01-22 04:18:57.359962
20250122041848647,20250122041848647_0_833,10250,,0ef2c041-9652-46a8-8b55-2331f8a691ae-0_0-257-1051_20250122043641024.parquet,10250,HANAR,4,1996-07-08,1996-08-05,1996-07-12,2,65.83,Hanari Carnes,"Rua do Paço, 67",Rio de Janeiro,RJ,05454-876,Brazil,2025-01-22 04:18:57.359962
20250122041848647,20250122041848647_0_834,10251,,0ef2c041-9652-46a8-8b55-2331f8a691ae-0_0-257-1051_20250122043641024.parquet,10251,VICTE,3,1996-07-08,1996-08-05,1996-07-15,1,41.34,Victuailles en stock,"2, rue du Commerce",Lyon,,69004,France,2025-01-22 04:18:57.359962
20250122041848647,20250122041848647_0_835,10252,,0ef2c041-9652-46a8-8b55-2331f8a691ae-0_0-257-1051_20250122043641024.parquet,10252,SUPRD,4,1996-07-09,1996-08-06,1996-07-11,2,51.3,Suprêmes délices,"Boulevard Tirou, 255",Charleroi,,B-6000,Belgium,2025-01-22 04:18:57.359962
20250122041848647,20250122041848647_0_836,10253,,0ef2c041-9652-46a8-8b55-2331f8a691ae-0_0-257-1051_20250122043641024.parquet,10253,HANAR,3,1996-07-10,1996-07-24,1996-07-16,2,58.17,Hanari Carnes,"Rua do Paço, 67",Rio de Janeiro,RJ,05454-876,Brazil,2025-01-22 04:18:57.359962
20250122041848647,20250122041848647_0_837,10254,,0ef2c041-9652-46a8-8b55-2331f8a691ae-0_0-257-1051_20250122043641024.parquet,10254,CHOPS,5,1996-07-11,1996-08-08,1996-07-23,2,22.98,Chop-suey Chinese,Hauptstr. 31,Bern,,3012,Switzerland,2025-01-22 04:18:57.359962
20250122041848647,20250122041848647_0_838,10255,,0ef2c041-9652-46a8-8b55-2331f8a691ae-0_0-257-1051_20250122043641024.parquet,10255,RICSU,9,1996-07-12,1996-08-09,1996-07-15,3,148.33,Richter Supermarkt,Starenweg 5,Genève,,1204,Switzerland,2025-01-22 04:18:57.359962
20250122041848647,20250122041848647_0_839,10256,,0ef2c041-9652-46a8-8b55-2331f8a691ae-0_0-257-1051_20250122043641024.parquet,10256,WELLI,3,1996-07-15,1996-08-12,1996-07-17,2,13.97,Wellington Importadora,"Rua do Mercado, 12",Resende,SP,08737-363,Brazil,2025-01-22 04:18:57.359962


In [55]:
from pyspark.sql.functions import col

df = spark.table("orders")

# Specify the record key you're interested in
record_key_to_find = "10248"

# Find all versions of the record
record_versions = df.filter(col("_hoodie_record_key") == record_key_to_find).orderBy(col("_hoodie_commit_time").desc())

# Show all versions of the record
record_versions.show(truncate=False)

# Get the second most recent version (the previous version)
previous_version = record_versions.limit(2).tail(1)[0]
#convert to dataframe to use display
previous_version_df = spark.createDataFrame([previous_version], record_versions.schema)

display(previous_version_df.toPandas())

#or just show it
print("previous version")
print(previous_version)


+-------------------+---------------------+------------------+----------------------+---------------------------------------------------------------------------+-------+----------+----------+----------+------------+-----------+-------+-------+-------------------------+------------------+--------+----------+--------------+-----------+--------------------------+
|_hoodie_commit_time|_hoodie_commit_seqno |_hoodie_record_key|_hoodie_partition_path|_hoodie_file_name                                                          |orderid|customerid|employeeid|orderdate |requireddate|shippeddate|shipvia|freight|shipname                 |shipaddress       |shipcity|shipregion|shippostalcode|shipcountry|ts                        |
+-------------------+---------------------+------------------+----------------------+---------------------------------------------------------------------------+-------+----------+----------+----------+------------+-----------+-------+-------+-------------------------+-----

                                                                                

Unnamed: 0,_hoodie_commit_time,_hoodie_commit_seqno,_hoodie_record_key,_hoodie_partition_path,_hoodie_file_name,orderid,customerid,employeeid,orderdate,requireddate,shippeddate,shipvia,freight,shipname,shipaddress,shipcity,shipregion,shippostalcode,shipcountry,ts
0,20250122043641024,20250122043641024_0_0,10248,,0ef2c041-9652-46a8-8b55-2331f8a691ae-0_0-257-1...,10248,VINET,5,1996-07-04,1996-08-01,1996-07-16,3,32.38,Vins et alcools Chevalier,59 rue de l'Abbaye,Reims,,51100,Spain,2025-01-22 04:18:57.359962


previous version
Row(_hoodie_commit_time='20250122043641024', _hoodie_commit_seqno='20250122043641024_0_0', _hoodie_record_key='10248', _hoodie_partition_path='', _hoodie_file_name='0ef2c041-9652-46a8-8b55-2331f8a691ae-0_0-257-1051_20250122043641024.parquet', orderid=10248, customerid='VINET', employeeid=5, orderdate='1996-07-04', requireddate='1996-08-01', shippeddate='1996-07-16', shipvia=3, freight=32.38, shipname='Vins et alcools Chevalier', shipaddress="59 rue de l'Abbaye", shipcity='Reims', shipregion=None, shippostalcode='51100', shipcountry='Spain', ts=datetime.datetime(2025, 1, 22, 4, 18, 57, 359962))


In [63]:
%%sparksql
select * from orders where orderid = 10248

0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19
_hoodie_commit_time,_hoodie_commit_seqno,_hoodie_record_key,_hoodie_partition_path,_hoodie_file_name,orderid,customerid,employeeid,orderdate,requireddate,shippeddate,shipvia,freight,shipname,shipaddress,shipcity,shipregion,shippostalcode,shipcountry,ts
20250122045211776,20250122045211776_0_0,10248,,bdf3465d-dbc2-489a-bb93-31c2b70cd628-0_0-307-1302_20250122045211776.parquet,10248,VINET,5,1996-07-04,1996-08-01,1996-07-16,3,32.38,Vins et alcools Chevalier,59 rue de l'Abbaye,Reims,,51100,Spain,2025-01-22 04:46:48.526051


In [64]:
%%sparksql
select * from orders TIMESTAMP as of '2025-01-22 04:46:40' where orderid = 10248

0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19
_hoodie_commit_time,_hoodie_commit_seqno,_hoodie_record_key,_hoodie_partition_path,_hoodie_file_name,orderid,customerid,employeeid,orderdate,requireddate,shippeddate,shipvia,freight,shipname,shipaddress,shipcity,shipregion,shippostalcode,shipcountry,ts
20250122044640228,20250122044640228_0_2491,10248,,bdf3465d-dbc2-489a-bb93-31c2b70cd628-0_0-288-1082_20250122044640228.parquet,10248,VINET,5,1996-07-04,1996-08-01,1996-07-16,3,32.38,Vins et alcools Chevalier,59 rue de l'Abbaye,Reims,,51100,France,2025-01-22 04:46:48.526051


## You can query an existing Hudi table.

## Read in a file to a Spark DataFrame.

In [11]:
categories = spark.read.csv(f'{SOURCE_PATH}/northwind/CSVHeaders/categories', header=True)
categories.show()


+----------+--------------+--------------------+
|CategoryID|  CategoryName|         Description|
+----------+--------------+--------------------+
|         1|     Beverages|Soft drinks coffe...|
|         2|    Condiments|Sweet and savory ...|
|         3|   Confections|Desserts candies ...|
|         4|Dairy Products|             Cheeses|
|         5|Grains/Cereals|Breads crackers p...|
|         6|  Meat/Poultry|      Prepared meats|
|         7|       Produce|Dried fruit and b...|
|         8|       Seafood|    Seaweed and fish|
+----------+--------------+--------------------+



## Use createOrReplaceTempView to create a virtual table in the Hive catalog and then it can be queried using SQL as if it were a hive table.

In [12]:
categories.createOrReplaceTempView('categories')
t1 =spark.sql('select * from categories where categoryid = 1')
t1.show()
print(t1.count())


+----------+------------+--------------------+
|CategoryID|CategoryName|         Description|
+----------+------------+--------------------+
|         1|   Beverages|Soft drinks coffe...|
+----------+------------+--------------------+

1


### The create table as syntax also works

In [None]:
spark.sql('create table categories2 as select * from categories')


## Queries use standard HQL to mix Hive tables and virtual tables. Both are read into a Spark DataFrame and the processing happens at the Spark level, not at the Hive level. HQL is just used to parse the logic into the corresponding Spark methods.

In [18]:
sql = """
SELECT c.categoryid, c.categoryname, p.productid, p.productname
from categories as c
join products as p on c.categoryid = p.categoryid
order by c.categoryid, p.productid
"""
df = spark.sql(sql)
# df.show(10)

df2 = categories.join(prod, categories.CategoryID == prod.productid) \
                .select(categories.CategoryID, 'categoryname', 'productid', 'productname') \
                .orderBy(categories.CategoryID, 'productid')

df2.show(10)



+----------+--------------+---------+--------------------+
|CategoryID|  categoryname|productid|         productname|
+----------+--------------+---------+--------------------+
|         1|     Beverages|        1|                Chai|
|         2|    Condiments|        2|               Chang|
|         3|   Confections|        3|       Aniseed Syrup|
|         4|Dairy Products|        4|Chef Anton's Caju...|
|         5|Grains/Cereals|        5|Chef Anton's Gumb...|
|         6|  Meat/Poultry|        6|Grandma's Boysenb...|
|         7|       Produce|        7|Uncle Bob's Organ...|
|         8|       Seafood|        8|Northwoods Cranbe...|
+----------+--------------+---------+--------------------+



## Creating the regions2 DataFrame does not execute anything yet, but by making the DataFrame into a Temp View then running a Spark SQL query, it tells Spark to read the SQL data into a DataFrame and then use the cluster to do the processing, not the SQL source.

In [None]:
regions2.createOrReplaceTempView('regions2')
spark.sql('select * from regions2 where regionid < 3').show()

In [None]:
spark.read.table('regions2').where('regionid < 3').show()

## Alternate ways to code a query using SQL and methods.

In [None]:
print(spark.sql('select count(*) from regions').collect())
spark.sql('select * from regions').count()

## Using SQL you can use familiar syntax instead of withColumn or withColumnRenamed methods.
Note the expr function needs to be imported when you want to use a stringified SQL function using dot syntax.

In [None]:
from pyspark.sql.functions import expr

t1 = spark.sql('select TerritoryID as TerrID, UPPER(TerritoryName) as TerritoryName, RegionID from territories')
t1.show(5)

from pyspark.sql.functions import expr
territories.withColumn('TerritoryName', expr('UPPER(TerritoryName)')).withColumnRenamed('TerritoryID', 'TerrID').show(5)


In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import *

# This won't work though if you want to use python functions, you need to go another step
# territories.withColumn('TerritoryName', territories.TerritoryName.upper()).show()

# You need to make the python function callable by spark by wrapping it in the udf function
# which tells spark what datatype it returns
territories.withColumn('TerritoryName', udf(str.upper, StringType())(territories.TerritoryName)).show()



## If you want to use a function that is not a standard Python or SQL function, you can always create one in Python and make it callable from Spark.

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import *

def inventoryvalue(quantity, price):
    return quantity * price

# Turn the Python function into a Spark callable function
invvalue = udf(inventoryvalue, FloatType())
p = products
p2 = p.withColumn('value', invvalue(p.unitsinstock, p.unitprice))
display(p2)




## Python decorators are an even better option.

In [None]:
@udf(FloatType())
def inventoryvalue(quantity, price):
    return quantity * price

p2 = p.withColumn('value', inventoryvalue(p.unitsinstock, p.unitprice))
display(p2)


In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import *

def inventoryvalue(quantity, price):
    return quantity * price

# Or dynamically wrap it, but it's harder to read
p2 = p.withColumn('value', udf(inventoryvalue, FloatType())(p.unitsinstock, p.unitprice))
display(p2)



## To make it easier though, you could make the Python function into a udf that SQL can understand similar to how you can make a DataFrame seem like a virtual table with createOrReplaceTempView.

In [None]:
def reverseString(x):
    return x[::-1]

spark.udf.register('reverse', reverseString, StringType())

spark.sql('select *, reverse(TerritoryName) as Reversed from Territories').orderBy('Reversed').show()

## HQL has collect_set and collect_list functions to aggregate items into a list instead of summing them up. 

In [None]:
from pyspark.sql.functions import collect_list
territories.groupBy(territories.RegionID).agg(collect_list(territories.TerritoryName)).show()

tr1 = spark.sql("SELECT RegionID, collect_list(TerritoryName) AS TerritoryList FROM Territories GROUP BY RegionID")
tr1.show()
tr1.printSchema()
print(tr1.take(1))


## Instead of a simple datatype, you could also collect complex structured objects using the HQL NAMED_STRUCT.

In [None]:
sql = """
SELECT r.RegionID, r.RegionName
, COLLECT_SET(NAMED_STRUCT("TerritoryID", TerritoryID, "TerritoryName", TerritoryName)) AS TerritoryList
FROM Regions AS r
JOIN Territories AS t ON r.RegionID = t.RegionID
GROUP BY r.RegionID, r.RegionName
ORDER BY r.RegionID
"""

tr2 = spark.sql(sql)
tr2.printSchema()
print(tr2)
tr2.show()
print(tr2.take(2))
tr2.write.json('TerritoryRegion.json')
spark.sql('create table TerritoryRegion as ' + sql)

## If you have data that is already collected into a complex datatype and want to flatten it, you could use HQL EXPLODE function.

## You could use the Spark explode method.

In [None]:
from pyspark.sql.functions import explode
tr1.select('RegionID', explode('TerritoryList')).show()


## Or if the DataFrame is turned into a Temp View, you could use the HQL query to do it.

In [None]:
tr1.createOrReplaceTempView('RegionTerritories')
sql = """
SELECT RegionID, TerritoryName
FROM RegionTerritories
LATERAL VIEW EXPLODE(TerritoryList) EXPLODED_TABLE AS TerritoryName
ORDER BY RegionID, TerritoryName
"""
spark.sql(sql).show()

## Or you could select specific elements from a collection.

In [None]:
tr2.createOrReplaceTempView('RegionTerritories')
spark.sql("select RegionId, RegionName, TerritoryList[0] as First, TerritoryList[size(TerritoryList) - 1] as Last, size(TerritoryList) as TerritoryCount from RegionTerritories").show()


## If the array is of structs, note the syntax of fetching the elements from the struct uses the . like an object property.

In [None]:
sql = """
SELECT RegionID, RegionName, Territory.TerritoryID AS TerritoryID
, Territory.TerritoryName AS TerritoryName
FROM RegionTerritories
LATERAL VIEW EXPLODE(TerritoryList) EXPLODED_TABLE AS Territory
"""
spark.sql(sql).show()


## HOMEWORK: ## 
**First Challenge**

Create a Python function to determine if a number is odd or even and use that to select only the even numbered shippers from the TSV folder of northwind. Note the TSV file does not have headers so you will need to do something to make the DataFrame have a meaningful structure. I would suggest using Spark SQL as much as possible to rename and cast the columns which are ShipperID, CompanyName, and Phone.

**Second Challenge**

Take the Order_LineItems.json folder, read it into a DataFrame, and flatten it and then calculate the average price paid for a product.

<br>
<details><summary>Click for <b>hint</b></summary>
<p>
Take a look at the MakeOrders_LineItems.py file provided to see how the Order_LineItems.json was generated in the first place
<br>
Use modulus with remainder of zero to determine if something is even
<br>
Use udf to make a version of the function that is callable using dot syntax and udf.register to make a version callable from within a SQL string
<br>
Use LATERAL VIEW EXPLODE() EXPLODED_TABLE to flatten out the nested format file
<br>
Once flattened do a traditional aggregate to calculate the average
<br>
<br>
</p>
</details>



