In [0]:
import re
from datetime import datetime

rel_path = "airport_full_20251201.txt"
match = re.search(r'20\d{2}(0[1-9]|1[0-2])(0[1-9]|[12][0-9]|3[01])', rel_path)
yyyymmdd = match.group(0) if match else None

try:
    file_date = datetime.strptime(yyyymmdd, "%Y%m%d").date() if yyyymmdd else None
except Exception:
    file_date = None
display(file_date)

In [0]:
from pyspark.sql.functions import col

rel_path1 = "airport_full_20251201.txt"
rel_path2 = "airport_full_20251202.txt"
rel_path3 = "airport_full_20251203.txt"

df1 = spark.read.option("header", "true").option("sep", ",").csv(f"/Volumes/demos_standard/public_data/public_raw/Airport_Batches/{rel_path1}")
df2 = spark.read.option("header", "true").option("sep", ",").csv(f"/Volumes/demos_standard/public_data/public_raw/Airport_Batches/{rel_path2}")
df3 = spark.read.option("header", "true").option("sep", ",").csv(f"/Volumes/demos_standard/public_data/public_raw/Airport_Batches/{rel_path3}")

df2_ins = df2.join(
    df1,
    on="ident",
    how="left_anti"
)
df2_del = df1.join(
    df2,
    on="ident",
    how="left_anti"
)

df3_ins = df3.join(
    df2,
    on="ident",
    how="left_anti"
)
df3_del = df2.join(
    df3,
    on="ident",
    how="left_anti"
)

# Find changed rows: same ident, any field difference
join_cols = [c for c in df1.columns if c == "ident"]
other_cols = [c for c in df1.columns if c != "ident"]

df2_chg = df2.alias("d2").join(
    df1.alias("d1"),
    on="ident",
    how="inner"
).where(
    " OR ".join([f"d2.{c} != d1.{c} OR (d2.{c} IS NULL AND d1.{c} IS NOT NULL) OR (d2.{c} IS NOT NULL AND d1.{c} IS NULL)" for c in other_cols])
).select("d2.*")

df3_chg = df3.alias("d3").join(
    df2.alias("d2"),
    on="ident",
    how="inner"
).where(
    " OR ".join([f"d3.{c} != d2.{c} OR (d3.{c} IS NULL AND d2.{c} IS NOT NULL) OR (d3.{c} IS NOT NULL AND d2.{c} IS NULL)" for c in other_cols])
).select("d3.*")



print(f"df1: {df1.count()}")
print(f"df2: {df2.count()}")
print(f"df3: {df3.count()}")

print(f"df2_ins : {df2_ins.count()}")
print(f"df2_del : {df2_del.count()}")
print(f"df2_chg : {df2_chg.count()}")

# display(df2_ins)
# display(df2_del)
# display(df2_chg)

print(f"df3_ins : {df3_ins.count()}")
print(f"d3_del : {df3_del.count()}")
print(f"df3_chg : {df3_chg.count()}")

In [0]:
%python
%pip install py7zr

In [0]:
import py7zr

full_path_7z = "/Volumes/demos_standard/public_data/public_raw/airport-codes.7z"
extract_dir = "/Volumes/demos_standard/public_data/public_raw/tmp/airport_codes_extract"

with py7zr.SevenZipFile(full_path_7z, mode='r') as archive:
    archive.extractall(path=extract_dir)

import os
csv_file = [f for f in os.listdir(extract_dir) if f.endswith('.csv')][0]
csv_path = os.path.join(extract_dir, csv_file)

df_all = spark.read.option("header", "true").option("sep", ",").csv(csv_path)
print(df_all.count())

In [0]:
print(f"Total records: {df_all.count()}")
print(f"Distinct ident count: {df_all.select('ident').distinct().count()}")