In [1]:
import pyspark
import re
from pyspark.sql import SparkSession
from pyspark.sql import Row
import pyspark.sql.functions as F
from pyspark.sql.types import StructType, StructField, StringType, DateType, IntegerType, DoubleType

Install findspark to make available spark from jupyter-python

In [2]:
#!conda insstall openjdk
#!conda install findspark
#install hadoop:
#https://github.com/ruslanmv/How-to-install-Hadoop-on-Windows?tab=readme-ov-file

In [3]:
import findspark
findspark.init()
findspark.find()

'c:\\soft\\miniconda3\\envs\\iceberg_env\\lib\\site-packages\\pyspark'

In [4]:
!pyspark --version

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.4.1
      /_/
                        
Using Scala version 2.12.17, OpenJDK 64-Bit Server VM, 11.0.13
Branch HEAD
Compiled by user centos on 2023-06-19T23:01:01Z
Revision 6b1ff22dde1ead51cbf370be6e48a802daae58b6
Url https://github.com/apache/spark
Type --help for more information.


In [5]:
#spark.stop()

In [6]:
catalog_nm = "flights_catalog"
layer_nm = "silver" #options: "silver", "gold"
warehouse_path = f"file:/C:/ws/github/iceberg-lakehouse/warehouse_flights/{layer_nm}"
sales_table_nm = "sales"

conf = (
    pyspark.SparkConf()
    .setAppName("iceberg_flights_app")
    #packages
    .set("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.5.2") #spark: 3.4, scala:2.12 iceberg:1.5.2
    #SQL extensions
    .set("spark.sql.extensions","org.apache.iceberg.spark.extensions:IcebergSparkSessionExtensions")
    #Eager evaluation
    .set('spark.sql.repl.eagerEval.enabled', True)
    #Configuring catalog
    .set(f"spark.sql.catalog.{catalog_nm}","org.apache.iceberg.spark.SparkCatalog")
    .set(f"spark.sql.catalog.{catalog_nm}.type","hadoop")
    .set(f"spark.sql.catalog.{catalog_nm}.warehouse",warehouse_path)
)

#Start SparkSession
spark = SparkSession.builder.config(conf=conf).getOrCreate()
print("Spark Running")

Spark Running


In [7]:
spark

Schema declaration

In [8]:
staging_in_schema = StructType([
    StructField("transaction_id", StringType(), nullable=False),
    StructField("customer_id", StringType(), nullable=False),
    StructField("transaction_timestamp", StringType(), nullable=False),
    StructField("price", StringType(), nullable=False),
    StructField("currency", StringType(), nullable=False),
    StructField("flight_id", StringType(), nullable=False),
    StructField("booking_reference", StringType(), nullable=True)
])

In [9]:
staging_path = "file:/C:/ws/github/iceberg-lakehouse/staging/"
date_str = "20130701"
file_nm = f"sellings_{date_str}_MADJHT.csv"
staging_file_path = staging_path + file_nm
#staging_df = spark.read.format('csv').option("header","true").load(staging_file_path)
#staging_df = spark.read.csv(staging_file_path, header=True, nanValue="N/A",mode="FAILFAST",schema=staging_in_schema)
staging_df = (
    spark.read.format('csv')
    .option("header","true")
    .schema(staging_in_schema)
    .load(staging_file_path)
)

In [10]:
staging_df.printSchema()

root
 |-- transaction_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- transaction_timestamp: string (nullable = true)
 |-- price: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- flight_id: string (nullable = true)
 |-- booking_reference: string (nullable = true)



In [11]:
staging_df.show()

+--------------+-----------+---------------------+------+--------+---------+-----------------+
|transaction_id|customer_id|transaction_timestamp| price|currency|flight_id|booking_reference|
+--------------+-----------+---------------------+------+--------+---------+-----------------+
|   TRANS000001|   CUST0011|  2023-07-01 06:24:38| 94.42|     JPY|FLIGHT244|           YCQFNE|
|   TRANS000002|   CUST0008|  2023-07-01 09:28:36|218.85|     AUD|FLIGHT216|           MHT3NF|
|   TRANS000003|   CUST0014|  2023-07-01 11:03:03|386.81|     JPY|FLIGHT283|           O2XX7D|
|   TRANS000004|   CUST0017|  2023-07-01 03:05:35|124.76|     EUR|FLIGHT383|           EERBGZ|
|   TRANS000005|   CUST0011|  2023-07-01 20:54:31| 716.0|     USD|FLIGHT880|           299JQ9|
|   TRANS000006|   CUST0010|  2023-07-01 10:13:58|323.22|     GBP|FLIGHT594|           NN6EKT|
|   TRANS000007|   CUST0002|  2023-07-01 17:02:26|663.38|     JPY|FLIGHT786|           GZMGEY|
|   TRANS000008|   CUST0001|  2023-07-01 22:31:49|

In [12]:
staging_df.count()

100

Calculate columns for partitioning:year_ptt, month_ptt, year_ptt

In [13]:
#Extract transaction year, month and day from the csv filename
print(file_nm)
#file_nm2= "selkjdkjfijelskjlkjledkjfe_201307901_eldkfkendnknd.csv"
matched = re.search("[^_]*_(\\d{4})(\\d{2})(\\d{2})_[^\.]*\.csv",file_nm)
if matched:
    year_ptt = matched.group(1)
    month_ptt = matched.group(2)
    day_ptt = matched.group(3)
    print(f"{year_ptt}-{month_ptt}-{day_ptt}")
else:
    raise ValueError("Incorrect file name")

sellings_20130701_MADJHT.csv
2013-07-01


In [14]:
staging_df = (
    staging_df.withColumn('year_ptt',F.lit(year_ptt).cast(IntegerType()))
    .withColumn('month_ptt',F.lit(month_ptt).cast(IntegerType()))
    .withColumn('day_ptt',F.lit(day_ptt).cast(IntegerType()))
)

In [15]:
staging_df.printSchema()

root
 |-- transaction_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- transaction_timestamp: string (nullable = true)
 |-- price: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- flight_id: string (nullable = true)
 |-- booking_reference: string (nullable = true)
 |-- year_ptt: integer (nullable = true)
 |-- month_ptt: integer (nullable = true)
 |-- day_ptt: integer (nullable = true)



In [16]:
staging_df.show()

+--------------+-----------+---------------------+------+--------+---------+-----------------+--------+---------+-------+
|transaction_id|customer_id|transaction_timestamp| price|currency|flight_id|booking_reference|year_ptt|month_ptt|day_ptt|
+--------------+-----------+---------------------+------+--------+---------+-----------------+--------+---------+-------+
|   TRANS000001|   CUST0011|  2023-07-01 06:24:38| 94.42|     JPY|FLIGHT244|           YCQFNE|    2013|        7|      1|
|   TRANS000002|   CUST0008|  2023-07-01 09:28:36|218.85|     AUD|FLIGHT216|           MHT3NF|    2013|        7|      1|
|   TRANS000003|   CUST0014|  2023-07-01 11:03:03|386.81|     JPY|FLIGHT283|           O2XX7D|    2013|        7|      1|
|   TRANS000004|   CUST0017|  2023-07-01 03:05:35|124.76|     EUR|FLIGHT383|           EERBGZ|    2013|        7|      1|
|   TRANS000005|   CUST0011|  2023-07-01 20:54:31| 716.0|     USD|FLIGHT880|           299JQ9|    2013|        7|      1|
|   TRANS000006|   CUST0

In [17]:
#staging_df = staging_df.withColumn('operation_date',F.to_date(staging_df.operation_date_str,'yyyyMMdd').alias('operation_date'))
#staging_df.show()

Dataquality

In [18]:
[c for c in staging_df.columns]

['transaction_id',
 'customer_id',
 'transaction_timestamp',
 'price',
 'currency',
 'flight_id',
 'booking_reference',
 'year_ptt',
 'month_ptt',
 'day_ptt']

In [19]:
#Search for null values
staging_df.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in staging_df.columns]).show()

+--------------+-----------+---------------------+-----+--------+---------+-----------------+--------+---------+-------+
|transaction_id|customer_id|transaction_timestamp|price|currency|flight_id|booking_reference|year_ptt|month_ptt|day_ptt|
+--------------+-----------+---------------------+-----+--------+---------+-----------------+--------+---------+-------+
|             0|          0|                    0|    0|       0|        0|                0|       0|        0|      0|
+--------------+-----------+---------------------+-----+--------+---------+-----------------+--------+---------+-------+



In [20]:
#Search for duplicates
df_count = staging_df.count()

In [21]:
df_distinct_count = staging_df.distinct().count()

In [22]:
duplicates = df_count-df_distinct_count
assert duplicates == 0, f"There are {duplicates} records duplicate"

In [23]:
#Format validation

In [24]:
transaction_id_format_error_df = staging_df.select(F.count(F.when(~F.col('transaction_id').rlike(r'TRANS\d{6}'), True)).alias('count'))

In [25]:
transaction_id_format_error_count = transaction_id_format_error_df.collect()[0][0]
assert transaction_id_format_error_count == 0,f"transaction_id invalid format count={transaction_id_format_error_count}"

Creation of the final DataFrame with the correct schema

In [26]:
staging_df_schema = StructType([
    StructField("transaction_id", StringType(), nullable=False),
    StructField("customer_id", StringType(), nullable=False),
    StructField("transaction_timestamp", StringType(), nullable=False),
    StructField("price", StringType(), nullable=False),
    StructField("currency", StringType(), nullable=False),
    StructField("flight_id", StringType(), nullable=False),
    StructField("booking_reference", StringType(), nullable=True),
    StructField("year_ptt", IntegerType(), nullable=False),
    StructField("month_ptt", IntegerType(), nullable=False),
    StructField("day_ptt", IntegerType(), nullable=False)
])

In [27]:
final_staging_df = spark.createDataFrame(staging_df.collect(),schema=staging_df_schema)

In [28]:
final_staging_df.printSchema()

root
 |-- transaction_id: string (nullable = false)
 |-- customer_id: string (nullable = false)
 |-- transaction_timestamp: string (nullable = false)
 |-- price: string (nullable = false)
 |-- currency: string (nullable = false)
 |-- flight_id: string (nullable = false)
 |-- booking_reference: string (nullable = true)
 |-- year_ptt: integer (nullable = false)
 |-- month_ptt: integer (nullable = false)
 |-- day_ptt: integer (nullable = false)



In [29]:
final_staging_df.show()

+--------------+-----------+---------------------+------+--------+---------+-----------------+--------+---------+-------+
|transaction_id|customer_id|transaction_timestamp| price|currency|flight_id|booking_reference|year_ptt|month_ptt|day_ptt|
+--------------+-----------+---------------------+------+--------+---------+-----------------+--------+---------+-------+
|   TRANS000001|   CUST0011|  2023-07-01 06:24:38| 94.42|     JPY|FLIGHT244|           YCQFNE|    2013|        7|      1|
|   TRANS000002|   CUST0008|  2023-07-01 09:28:36|218.85|     AUD|FLIGHT216|           MHT3NF|    2013|        7|      1|
|   TRANS000003|   CUST0014|  2023-07-01 11:03:03|386.81|     JPY|FLIGHT283|           O2XX7D|    2013|        7|      1|
|   TRANS000004|   CUST0017|  2023-07-01 03:05:35|124.76|     EUR|FLIGHT383|           EERBGZ|    2013|        7|      1|
|   TRANS000005|   CUST0011|  2023-07-01 20:54:31| 716.0|     USD|FLIGHT880|           299JQ9|    2013|        7|      1|
|   TRANS000006|   CUST0

Write option 1: Spark DataFrame method

In [30]:
sales_tb = f"{catalog_nm}.{sales_table_nm}"
final_staging_df.writeTo(sales_tb).overwritePartitions()
#final_staging_df.writeTo(sales_tb).append()

In [31]:
spark.sql(
    f"""
    SELECT * 
    FROM {sales_tb}
    LIMIT 10;
    """
).show()

+--------------+-----------+---------------------+------+--------+---------+-----------------+--------+---------+-------+
|transaction_id|customer_id|transaction_timestamp| price|currency|flight_id|booking_reference|year_ptt|month_ptt|day_ptt|
+--------------+-----------+---------------------+------+--------+---------+-----------------+--------+---------+-------+
|   TRANS000001|   CUST0011|  2023-07-01 06:24:38| 94.42|     JPY|FLIGHT244|           YCQFNE|    2013|        7|      1|
|   TRANS000002|   CUST0008|  2023-07-01 09:28:36|218.85|     AUD|FLIGHT216|           MHT3NF|    2013|        7|      1|
|   TRANS000003|   CUST0014|  2023-07-01 11:03:03|386.81|     JPY|FLIGHT283|           O2XX7D|    2013|        7|      1|
|   TRANS000004|   CUST0017|  2023-07-01 03:05:35|124.76|     EUR|FLIGHT383|           EERBGZ|    2013|        7|      1|
|   TRANS000005|   CUST0011|  2023-07-01 20:54:31| 716.0|     USD|FLIGHT880|           299JQ9|    2013|        7|      1|
|   TRANS000006|   CUST0

In [32]:
spark.sql(
    f"""
    SELECT COUNT(*) 
    FROM {catalog_nm}.{sales_table_nm};
    """
).show()

+--------+
|count(1)|
+--------+
|     100|
+--------+



Write option 2: Spark SQL (UPSERT)

In [33]:
#Create a temporary view for the dataframe
staging_df_view = "staging_df"
final_staging_df.createOrReplaceTempView(staging_df_view)

In [34]:
#show dataframe contents using Spark SQL
spark.sql(
    f"""
    SELECT * 
    FROM {staging_df_view};
    """
).show()

+--------------+-----------+---------------------+------+--------+---------+-----------------+--------+---------+-------+
|transaction_id|customer_id|transaction_timestamp| price|currency|flight_id|booking_reference|year_ptt|month_ptt|day_ptt|
+--------------+-----------+---------------------+------+--------+---------+-----------------+--------+---------+-------+
|   TRANS000001|   CUST0011|  2023-07-01 06:24:38| 94.42|     JPY|FLIGHT244|           YCQFNE|    2013|        7|      1|
|   TRANS000002|   CUST0008|  2023-07-01 09:28:36|218.85|     AUD|FLIGHT216|           MHT3NF|    2013|        7|      1|
|   TRANS000003|   CUST0014|  2023-07-01 11:03:03|386.81|     JPY|FLIGHT283|           O2XX7D|    2013|        7|      1|
|   TRANS000004|   CUST0017|  2023-07-01 03:05:35|124.76|     EUR|FLIGHT383|           EERBGZ|    2013|        7|      1|
|   TRANS000005|   CUST0011|  2023-07-01 20:54:31| 716.0|     USD|FLIGHT880|           299JQ9|    2013|        7|      1|
|   TRANS000006|   CUST0

In [35]:
#Upsert
spark.sql(
    f"""
    MERGE INTO {catalog_nm}.{sales_table_nm} AS t
    USING (SELECT * FROM {staging_df_view}) AS s
    ON s.transaction_id = t.transaction_id
        AND t.year_ptt = {year_ptt}    --push down filters
        AND t.month_ptt = {month_ptt}
        AND t.day_ptt = {day_ptt}
    WHEN MATCHED THEN
        UPDATE SET
           t.customer_id = s.customer_id,
           t.transaction_timestamp = s.transaction_timestamp,
           t.price = s.price,
           t.currency = s.currency,
           t.flight_id = s.flight_id,
           t.booking_reference = s.booking_reference,
           t.year_ptt = s.year_ptt,
           t.month_ptt = s.month_ptt,
           t.day_ptt = s.day_ptt
    WHEN NOT MATCHED BY TARGET THEN
        INSERT(transaction_id,customer_id,transaction_timestamp,
            price,currency,flight_id,booking_reference,year_ptt,month_ptt,day_ptt)
        VALUES(s.transaction_id,s.customer_id,s.transaction_timestamp,
            s.price,s.currency,s.flight_id,s.booking_reference,s.year_ptt,s.month_ptt,s.day_ptt)
    ;
    """
)

Py4JJavaError: An error occurred while calling o39.sql.
: org.apache.spark.SparkUnsupportedOperationException: MERGE INTO TABLE is not supported temporarily.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.ddlUnsupportedTemporarilyError(QueryExecutionErrors.scala:1109)
	at org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:898)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$1(QueryPlanner.scala:63)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
	at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:70)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
	at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
	at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
	at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
	at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:70)
	at org.apache.spark.sql.execution.QueryExecution$.createSparkPlan(QueryExecution.scala:476)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$sparkPlan$1(QueryExecution.scala:162)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:202)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:202)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
	at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:201)
	at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:162)
	at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:155)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$1(QueryExecution.scala:175)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:202)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:202)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
	at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:201)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:175)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:168)
	at org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:221)
	at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:266)
	at org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:235)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:112)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
	at org.apache.spark.sql.Dataset.<init>(Dataset.scala:219)
	at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:99)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:96)
	at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:640)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:630)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:662)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
