In [0]:
# Usado para entendimento da estrutura do JSON fornecido: 

caminho_json = "/Volumes/workspace/default/data/ERP.json"

df = spark.read.option("multiline", "true").json(caminho_json)

df.printSchema()

df.display()

root
 |-- curUTC: string (nullable = true)
 |-- guestChecks: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- balDueTtl: string (nullable = true)
 |    |    |-- chkNum: long (nullable = true)
 |    |    |-- chkTtl: double (nullable = true)
 |    |    |-- clsdBusDt: string (nullable = true)
 |    |    |-- clsdFlag: boolean (nullable = true)
 |    |    |-- clsdLcl: string (nullable = true)
 |    |    |-- clsdUTC: string (nullable = true)
 |    |    |-- detailLines: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- aggQty: long (nullable = true)
 |    |    |    |    |-- aggTtl: double (nullable = true)
 |    |    |    |    |-- busDt: string (nullable = true)
 |    |    |    |    |-- chkEmpId: long (nullable = true)
 |    |    |    |    |-- chkEmpNum: long (nullable = true)
 |    |    |    |    |-- detailLcl: string (nullable = true)
 |    |    |    |    |-- detailUTC: string (nullable = true)
 |

curUTC,guestChecks,locRef
2024-05-05T06:06:06,"List(List(null, 1234, 109.9, 2024-01-01, true, 2024-01-01T09:12:12, 2024-01-01T12:12:12, List(List(1, 119.9, 2024-01-01, 10454318, 81001, 2024-01-01T06:09:09, 2024-01-01T09:09:09, 1, 119.9, 1, null, 1, 9988776655, 2024-01-01T07:10:10, 2024-11-01T10:10:10, 1, List(28, 20.809091, 6042, false, 3), 123, 1, 1, 7)), -10, 55555, 1, 1122334455, 2024-01-01T09:12:12, 2024-01-01T12:12:12, 2024-01-01T10:13:13, 2024-01-01T13:13:13, null, 2, 3, null, 2024-01-01, 2024-01-01T06:09:09, 2024-01-01T09:09:09, 1, 109.9, 101, 109.9, List(List(20.81, 28, 21, 119.9, 3)), 90, 1))",99 CB CB


In [0]:
%sql
Table guest_check {
  guest_check_id BIGINT [pk]
  chk_num BIGINT
  chk_ttl DOUBLE
  bal_due_ttl STRING
  clsd_flag BOOLEAN
  clsd_bus_dt STRING
  clsd_lcl STRING
  clsd_utc STRING
  emp_num BIGINT
  gst_cnt BIGINT
  last_trans_lcl STRING
  last_trans_utc STRING
  last_updated_lcl STRING
  last_updated_utc STRING
  non_txbl_sls_ttl STRING
  num_chk_prntd BIGINT
  num_srvc_rd BIGINT
  oc_num STRING
  opn_bus_dt STRING
  opn_lcl STRING
  opn_utc STRING
  ot_num BIGINT
  pay_ttl DOUBLE
  rvc_num BIGINT
  sub_ttl DOUBLE
  tbl_name STRING
  tbl_num BIGINT
  loc_ref STRING
}

Table detail_line {
  dtl_id BIGINT [pk]
  guest_check_id BIGINT [ref: > guest_check.guest_check_id]
  agg_qty BIGINT
  agg_ttl DOUBLE
  bus_dt STRING
  detail_lcl STRING
  detail_utc STRING
  dsp_qty BIGINT
  dsp_ttl DOUBLE
  dtl_oc_num STRING
  dtl_ot_num BIGINT
  guest_check_line_item_id BIGINT
  last_update_lcl STRING
  last_update_utc STRING
  line_num BIGINT
  menu_item_id BIGINT [ref: > menu_item.menu_item_id]
}

Table menu_item {
  menu_item_id BIGINT [pk]
  active_taxes STRING
  incl_tax DOUBLE
  mi_num BIGINT
  mod_flag BOOLEAN
  prc_lvl BIGINT
  rvc_num BIGINT
  seat_num BIGINT
  svc_rnd_num BIGINT
  ws_num BIGINT
}

Table tax {
  tax_id BIGINT [pk]
  guest_check_id BIGINT [ref: > guest_check.guest_check_id]
  tax_coll_ttl DOUBLE
  tax_num BIGINT
  tax_rate DOUBLE
}

Table discount {
  detail_line_id BIGINT [pk, ref: > detail_line.dtl_id]
  amount DOUBLE
  reason STRING
}

Table service_charge {
  detail_line_id BIGINT [pk, ref: > detail_line.dtl_id]
  amount DOUBLE
  description STRING
}

Table tender_media {
  detail_line_id BIGINT [pk, ref: > detail_line.dtl_id]
  payment_type STRING
  amount DOUBLE
}

Table error_code {
  detail_line_id BIGINT [pk, ref: > detail_line.dtl_id]
  code STRING
  message STRING
}


In [0]:
from pyspark.sql.functions import explode, col, monotonically_increasing_id

caminho_json = "/Volumes/workspace/default/data/ERP.json"
df = spark.read.option("multiline", "true").json(caminho_json)

df_guest_checks = df.withColumn("guestCheck", explode("guestChecks")).select("guestCheck.*")

df_guest_check = df_guest_checks.withColumn("guest_check_id", col("guestCheckId").cast("long"))

df_detail_lines = df_guest_check.withColumn("detailLine", explode("detailLines")).select(
    col("guest_check_id"),
    col("detailLine.*")
)

df_menu_item = df_detail_lines.select(
    col("dtlId").alias("detail_line_id"),
    col("menuItem.*")
).withColumnRenamed("miNum", "menu_item_id").dropDuplicates(["menu_item_id"])

df_tax = df_guest_check.withColumn("tax", explode("taxes")).select(
    col("guest_check_id"),
    col("tax.taxNum"),
    col("tax.taxCollTtl"),
    col("tax.taxRate")
).withColumn("tax_id", monotonically_increasing_id())


df_discount = df_detail_lines.filter(col("discount").isNotNull()).select(
    col("dtlId").alias("detail_line_id"),
    col("discount.*")
)

df_service_charge = df_detail_lines.filter(col("serviceCharge").isNotNull()).select(
    col("dtlId").alias("detail_line_id"),
    col("serviceCharge.*")
)

df_tender_media = df_detail_lines.filter(col("tenderMedia").isNotNull()).select(
    col("dtlId").alias("detail_line_id"),
    col("tenderMedia.*")
)

df_error_code = df_detail_lines.filter(col("errorCode").isNotNull()).select(
    col("dtlId").alias("detail_line_id"),
    col("errorCode.*")
)

print("guest_check schema")
df_guest_check.printSchema()
df_guest_check.show(5, truncate=False)

print("detail_line schema")
df_detail_lines.printSchema()
df_detail_lines.show(5, truncate=False)

print("menu_item schema")
df_menu_item.printSchema()
df_menu_item.show(5, truncate=False)

print("tax schema")
df_tax.printSchema()
df_tax.show(5, truncate=False)

df_guest_check.write.format("delta").mode("overwrite").saveAsTable("guest_check")
df_detail_lines.write.format("delta").mode("overwrite").saveAsTable("detail_line")
df_menu_item.write.format("delta").mode("overwrite").saveAsTable("menu_item")
df_tax.write.format("delta").mode("overwrite").saveAsTable("tax")

df_discount.write.format("delta").mode("overwrite").saveAsTable("discount")
df_service_charge.write.format("delta").mode("overwrite").saveAsTable("service_charge")
df_tender_media.write.format("delta").mode("overwrite").saveAsTable("tender_media")
df_error_code.write.format("delta").mode("overwrite").saveAsTable("error_code")


guest_check schema
root
 |-- balDueTtl: string (nullable = true)
 |-- chkNum: long (nullable = true)
 |-- chkTtl: double (nullable = true)
 |-- clsdBusDt: string (nullable = true)
 |-- clsdFlag: boolean (nullable = true)
 |-- clsdLcl: string (nullable = true)
 |-- clsdUTC: string (nullable = true)
 |-- detailLines: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- aggQty: long (nullable = true)
 |    |    |-- aggTtl: double (nullable = true)
 |    |    |-- busDt: string (nullable = true)
 |    |    |-- chkEmpId: long (nullable = true)
 |    |    |-- chkEmpNum: long (nullable = true)
 |    |    |-- detailLcl: string (nullable = true)
 |    |    |-- detailUTC: string (nullable = true)
 |    |    |-- dspQty: long (nullable = true)
 |    |    |-- dspTtl: double (nullable = true)
 |    |    |-- dtlId: long (nullable = true)
 |    |    |-- dtlOcNum: string (nullable = true)
 |    |    |-- dtlOtNum: long (nullable = true)
 |    |    |-- guestCheckLineItemI

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-6075970135109178>, line 79[0m
[1;32m     77[0m df_guest_check[38;5;241m.[39mwrite[38;5;241m.[39mformat([38;5;124m"[39m[38;5;124mdelta[39m[38;5;124m"[39m)[38;5;241m.[39mmode([38;5;124m"[39m[38;5;124moverwrite[39m[38;5;124m"[39m)[38;5;241m.[39msaveAsTable([38;5;124m"[39m[38;5;124mguest_check[39m[38;5;124m"[39m)
[1;32m     78[0m df_detail_lines[38;5;241m.[39mwrite[38;5;241m.[39mformat([38;5;124m"[39m[38;5;124mdelta[39m[38;5;124m"[39m)[38;5;241m.[39mmode([38;5;124m"[39m[38;5;124moverwrite[39m[38;5;124m"[39m)[38;5;241m.[39msaveAsTable([38;5;124m"[39m[38;5;124mdetail_line[39m[38;5;124m"[39m)
[0;32m---> 79[0m df_menu_item[38;5;241m.[39mwrite[38;5;241m.[39mformat([38;5;124m"[39m[38;5;124mdelta[39m[38;5;124m"[39m)[38;5;241m.[39mmode(

In [0]:
%sql
select * from detail_line;
select * from menu_item;
select * from tax;

guest_check_id,taxNum,taxCollTtl,taxRate
1122334455,28,20.81,21
