# Small dataset

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

dbutils.fs.rm("/mnt/filesystem/delta_antypatterns/facts_and_merge/flights", recurse= True)
windowSpec = Window.orderBy(["date", "origin", "destination"])

df = spark.read.format("csv").option("header", True).option("inferSchema", True).load("/databricks-datasets/flights/departuredelays.csv")
df = df.withColumn("id",row_number().over(windowSpec))
df.write.format("delta").mode("overwrite").partitionBy("origin").save("/mnt/filesystem/delta_antypatterns/facts_and_merge/flights")

### Update 2 partitions

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
from pyspark.sql.functions import lit

windowSpec = Window.orderBy(["date", "origin", "destination"])

df = spark.read.format("csv").option("header", True).option("inferSchema", True).load("/databricks-datasets/flights/departuredelays.csv")
df = df.withColumn("id",row_number().over(windowSpec))\
      .withColumn("distance", lit(0))\
      .filter("origin IN ('ATL', 'DFW') AND date%2 = 0")
      #.filter("date%2 = 0")
#df.show()
df.createOrReplaceTempView("stg_flights")

In [0]:
%sql
--CACHE TABLE stg_flights

In [0]:
%sql
merge into delta.`/mnt/filesystem/delta_antypatterns/facts_and_merge/flights` as tgt
using stg_flights as src 
on src.id = tgt.id and tgt.origin in ('ATL', 'DFW')
WHEN MATCHED 
  THEN UPDATE SET tgt.date = src.date, tgt.delay = src.delay, tgt.distance = src.distance, tgt.origin = src.origin, tgt.destination = src.destination
WHEN NOT MATCHED 
  THEN INSERT *

num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
729562,729562,0,0


In [0]:
df_part_merge = spark.sql("""
          select * from stg_flights as sf

          union

          select * 
          from delta.`/mnt/filesystem/delta_disjoint_predicates/flights` as f
              left anti join stg_flights as sf
              on sf.id = f.id
          where origin in ('ATL', 'DFW')
      """)

df_part_merge.write\
  .format("delta")\
  .mode("overwrite")\
  .option("replaceWhere", "origin IN ('ATL', 'DFW')")\
  .save("/mnt/filesystem/delta_disjoint_predicates/flights")


### Update all partitions

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
from pyspark.sql.functions import lit

windowSpec = Window.orderBy(["date", "origin", "destination"])

df = spark.read.format("csv").option("header", True).option("inferSchema", True).load("/databricks-datasets/flights/departuredelays.csv")
df = df.withColumn("id",row_number().over(windowSpec))\
      .withColumn("distance", lit(0))\
      .filter("date%2 = 0")
#df.show()
df.createOrReplaceTempView("stg_flights")

In [0]:
%sql
merge into delta.`/mnt/filesystem/delta_antypatterns/facts_and_merge/flights` as tgt
using stg_flights as src
on src.id = tgt.id
WHEN MATCHED 
  THEN UPDATE SET tgt.date = src.date, tgt.delay = src.delay, tgt.distance = src.distance, tgt.origin = src.origin, tgt.destination = src.destination
WHEN NOT MATCHED 
  THEN INSERT *

In [0]:
df_part_merge = spark.sql("""
          select * from stg_flights as sf

          union

          select * 
          from delta.`/mnt/filesystem/delta_disjoint_predicates/flights` as f
              left anti join stg_flights as sf
              on sf.id = f.id
      """)

df_part_merge.write\
  .format("delta")\
  .mode("overwrite")\
  .option("replaceWhere", "origin IN ('ATL', 'DFW')")\
  .save("/mnt/filesystem/delta_disjoint_predicates/flights")


# Bigger dataset (x100)

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

dbutils.fs.rm("/mnt/filesystem/delta_antypatterns/facts_and_merge/flights_100", recurse= True)
windowSpec = Window.orderBy(["date", "origin", "destination"])

df = spark.read.format("csv").option("header", True).option("inferSchema", True).load("/databricks-datasets/flights/departuredelays.csv")

max_rn = 0
for i in range(1,100):
  df = df.withColumn("id",row_number().over(windowSpec) + max_rn)
  df.write.format("delta").mode("append").partitionBy("origin").save("/mnt/filesystem/delta_antypatterns/facts_and_merge/flights_100")
  max_rn = df.count()*i

### Update 2 partitions

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
from pyspark.sql.functions import lit

windowSpec = Window.orderBy(["date", "origin", "destination"])

df = spark.read.format("csv").option("header", True).option("inferSchema", True).load("/databricks-datasets/flights/departuredelays.csv")
df = df.withColumn("id",row_number().over(windowSpec))\
      .withColumn("distance", lit(0))\
      .filter("origin IN ('ATL', 'DFW') AND date%2 = 0")
      #.filter("date%2 = 0")
#df.show()
df.createOrReplaceTempView("stg_flights")

In [0]:
%sql
merge into delta.`/mnt/filesystem/delta_antypatterns/facts_and_merge/flights_100` as tgt
using stg_flights as src
on src.id = tgt.id and tgt.origin in ('ATL', 'DFW')
WHEN MATCHED 
  THEN UPDATE SET tgt.date = src.date, tgt.delay = src.delay, tgt.distance = src.distance, tgt.origin = src.origin, tgt.destination = src.destination
WHEN NOT MATCHED 
  THEN INSERT *

num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
82196,82196,0,0


In [0]:
df_part_merge = spark.sql("""
          select * from stg_flights as sf

          union all

          select * from delta.`/mnt/filesystem/delta_antypatterns/facts_and_merge/flights_100` as f
            left anti join stg_flights as sf
            on sf.id = f.id
          where origin in ('ATL', 'DFW')
      """)

df_part_merge.write\
  .format("delta")\
  .mode("overwrite")\
  .option("replaceWhere", "origin IN ('ATL', 'DFW')")\
  .save("/mnt/filesystem/delta_antypatterns/facts_and_merge/flights_100")

### Update all partitions

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
from pyspark.sql.functions import lit

windowSpec = Window.orderBy(["date", "origin", "destination"])

df = spark.read.format("csv").option("header", True).option("inferSchema", True).load("/databricks-datasets/flights/departuredelays.csv")
df = df.withColumn("id",row_number().over(windowSpec))\
      .withColumn("distance", lit(0))\
      .filter("date%2 = 0")
df.createOrReplaceTempView("stg_flights")

In [0]:
%sql
merge into delta.`/mnt/filesystem/delta_antypatterns/facts_and_merge/flights_100` as tgt
using stg_flights as src
on src.id = tgt.id
WHEN MATCHED 
  THEN UPDATE SET tgt.date = src.date, tgt.delay = src.delay, tgt.distance = src.distance, tgt.origin = src.origin, tgt.destination = src.destination
WHEN NOT MATCHED 
  THEN INSERT *

num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
729562,729562,0,0


In [0]:
df_part_merge = spark.sql("""
          select * from stg_flights as sf

          union all

          select * from delta.`/mnt/filesystem/delta_antypatterns/facts_and_merge/flights_100` as f
            left anti join stg_flights as sf
            on sf.id = f.id
      """)

df_part_merge.write\
  .format("delta")\
  .mode("overwrite")\
  .save("/mnt/filesystem/delta_antypatterns/facts_and_merge/flights_100")