# Sprawdzenie dostępności bibliotek Delta Lake

Na początku wykonaj poniższe paragrafy tworzące tabelę Delta Lake, jeśli wszystko się powiedzie możesz przejść do sekcji ***Wprowadzenie*** pominąć następną sekcję poświęconą konfiguracji. 
W przeciwnym razie wykonaj polecenia z sekcji ***Konfiguracja***.

In [1]:
import pyspark
from delta import *
from pyspark.sql.functions import col, explode, array


builder = pyspark.sql.SparkSession.builder.appName("MyApp") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.databricks.delta.schema.autoMerge.enabled", "true")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

:: loading settings :: url = jar:file:/usr/lib/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-1b6637d4-72b7-48e8-97fd-8f865280cff1;1.0
	confs: [default]
	found io.delta#delta-core_2.12;2.3.0 in central
	found io.delta#delta-storage;2.3.0 in central
	found org.antlr#antlr4-runtime;4.8 in central
downloading https://repo1.maven.org/maven2/io/delta/delta-core_2.12/2.3.0/delta-core_2.12-2.3.0.jar ...
	[SUCCESSFUL ] io.delta#delta-core_2.12;2.3.0!delta-core_2.12.jar (142ms)
downloading https://repo1.maven.org/maven2/io/delta/delta-storage/2.3.0/delta-storage-2.3.0.jar ...
	[SUCCESSFUL ] io.delta#delta-storage;2.3.0!delta-storage.jar (38ms)
downloading https://repo1.maven.org/maven2/org/antlr/antlr4-runtime/4.8/antlr4-runtime-4.8.jar ...
	[SUCCESSFUL ] org.antlr#antlr4-runtime;4.8!antlr4-runtime.jar (41ms)
:: resolution report :: resolve 1499ms :: artifacts dl 229ms
	:: module

In [2]:
data = (
    spark.range(0, 5)
    .selectExpr("id as x")
    .withColumn("y", explode(array(col("x"))))
    .select("x", "y")
)

# Zapis danych do formatu Delta
data.write.format("delta").save("/tmp/delta-table2")

23/12/15 09:07:59 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

# Wprowadzenie

## Skonfigurowanie danych źródłowych 

Zanim zaczniemy korzystać i poznawać funkcjonalności biblioteki Delta Lake skonfigurujmy tabelę z danymi źródłowymi. 
A następnie uruchom go tworząc tymczasową perspektywę na przygotowanych uprzednio danych źródłowych. 

In [3]:
source_df = (
    spark.read
    .option("header", True)
    .option("quote", "\"")
    .csv("/tmp/DeltaLakeSourceData")
)

# Wyświetlenie schematu ramki danych
source_df.printSchema()

# Utworzenie tymczasowej tabeli
source_df.createOrReplaceTempView("source_data")

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- address: string (nullable = true)
 |-- zipcode: string (nullable = true)
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)
 |-- effectiveDate: string (nullable = true)



 
Nasze dane źródłowe zawierają informacje na temat klientów z kolejnych okresów czasu.

Uruchom poniższe zapytanie wydobywające wersje danych, które były aktualne na dzień `2021-01-01`.

In [17]:
df = spark.sql("""
select id, name, address, zipcode, city, country, effectiveDate
from  (
    select id, name, address, zipcode, city, country, effectiveDate, 
           rank() over (partition by id order by to_date(effectiveDate,"dd-MM-yyyy") desc) as version
    from   source_data
    where  to_date(effectiveDate,"dd-MM-yyyy") < to_date("2021-01-01","yyyy-MM-dd")
    ) tab
where version = 1""")

In [19]:
df.toPandas()

Unnamed: 0,id,name,address,zipcode,city,country,effectiveDate
0,10,Jin Terry,467-8297 Enim,35633573,Balıkesir,Nigeria,06-11-2020
1,100,Harriet Rojas,Ap #810-8710 Enim. St.,84541,Lipetsk,Canada,21-10-2020
2,11,Isabelle Stevenson,131-4245 Eleifend. Street,16142,Hà Giang,Russian Federation,25-11-2020
3,14,Leo Mcleod,467-8297 Enim,39153,Borås,Germany,16-10-2020
4,16,Kaitlin Landry,623-5682 Augue St.,351225,Libramont-Chevigny,Indonesia,29-10-2020
5,19,Alden Harper,Ap #579-2185 Sed Street,94671-72608,Châtellerault,Nigeria,06-12-2020
6,2,Brandon Christian,476-5064 Suspendisse Rd.,93-765,Broxburn,Russian Federation,28-11-2020
7,20,Kathleen Pugh,7018 Cras St.,3123,Ostrowiec Świętokrzyski,Peru,16-11-2020
8,26,Ulysses Dillard,1318 Tempor Rd.,S5J 6Z2,Tuscaloosa,United States,11-10-2020
9,28,Shaine Puckett,Ap #579-2185 Sed Street,85629,Cochrane,Poland,07-12-2020


W poniższych zadaniach możesz skorzystać zarówno z interfejsu w Scali jak i SQL. 

Wszystko zależy od Twoich preferencji.

Uwaga! W przypadku korzystania z SQL i wskazywania ścieżek, konieczne jest wykorzystywanie schematu `hdfs`.

Przykładowo `hdfs:/tmp/delta-customers`

## DDL

### Zadanie 1

Utwórz pustą tabelę *Delta Lake* o nazwie `customers`, której lokalizacją będzie `/tmp/delta-customers` (lub `hdfs:/tmp/delta-customers`). 

Kolumny tabeli muszą odpowiadać kolumnom danych źródłowych. 
Wszystkie kolumny powinny być ciągami znaków o długości do 200 znaków

Jeśli uruchamiasz ten notatnik po raz kolejny. Usuń zawartość katalogu z danymi tabeli Delta Lake wywołując poniższe polecenie

In [6]:
%%sh 
hadoop fs -rm -r /tmp/delta-customers

rm: `/tmp/delta-customers': No such file or directory


CalledProcessError: Command 'b'hadoop fs -rm -r /tmp/delta-customers\n'' returned non-zero exit status 1.

### Rozwiązanie zadania 1

In [7]:
spark.sql("DROP TABLE IF EXISTS customers")

ivysettings.xml file not found in HIVE_HOME or HIVE_CONF_DIR,/etc/hive/conf.dist/ivysettings.xml will be used


DataFrame[]

In [8]:
spark.sql("""CREATE TABLE
customers
 (
  id   VARCHAR(200),
  name VARCHAR(200), 
  address VARCHAR(200), 
  zipcode VARCHAR(200), 
  city VARCHAR(200), 
  country VARCHAR(200), 
  effectiveDate VARCHAR(200)
  )
USING DELTA
LOCATION 'hdfs:/tmp/delta-customers';""")

23/12/15 09:15:14 WARN HiveExternalCatalog: Couldn't find corresponding Hive SerDe for data source provider delta. Persisting data source table `default`.`customers` into Hive metastore in Spark SQL specific format, which is NOT compatible with Hive.
23/12/15 09:15:14 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.


DataFrame[]

## DML

### Zadanie 2

Wprowadź do utworzonej przez Ciebie tabeli dane o naszych klientach obowiązujące na dzień `2021-01-01`.

### Rozwiązanie zadania 2

In [9]:
spark.sql("""
INSERT INTO
customers
(id, name, address, zipcode, city, country, effectiveDate)
select
 id, name, address, zipcode, city, country, effectiveDate
from  (
    select id, name, address, zipcode, city, country, effectiveDate, 
           rank() over (partition by id order by to_date(effectiveDate,"dd-MM-yyyy") desc) as version
    from source_data
    where  to_date(effectiveDate,"dd-MM-yyyy") < date "2021-01-01"
    )
where version = 1;""")

                                                                                

DataFrame[]

### Zadanie 3

Zmień kraj zamieszkania na wartość `Poland` u wszystkich klientów posiadających wartość id mniejszą niż 50. 

Zwróć uwagę, że obecnie id jest ciągiem znaków. Użyj wyrażenia `cast(id as int)` aby wykonać zadanie prawidłowo.

### Rozwiązanie zadania 3

In [10]:
spark.sql("""
update customers
set country = 'Poland'
where
 cast(id as int) < 50;""")

                                                                                

DataFrame[num_affected_rows: bigint]

Na chwilę się zatrzymajmy. Utworzenie tabeli to wpis w metadanych. Wprowadzenie nowych danych to utworzenie nowych plików w katalogu tabeli. Czym było zmodyfikowanie tych danych? 

Wykonaj poniższe polecenie, aby przyglądnąć się zawartości katalogu należącego do tabeli `customers`

In [13]:
%%sh
hadoop fs -ls /tmp/delta-customers

Found 3 items
drwxr-xr-x   - root hadoop          0 2023-12-15 09:20 /tmp/delta-customers/_delta_log
-rw-r--r--   2 root hadoop       4473 2023-12-15 09:18 /tmp/delta-customers/part-00000-543056d5-009b-429a-b7cb-ceb7abbea198-c000.snappy.parquet
-rw-r--r--   2 root hadoop       4424 2023-12-15 09:20 /tmp/delta-customers/part-00000-c9a522ce-35d9-44e4-8996-a8317ea9a4e3-c000.snappy.parquet


Mamy katalog z logiem transakcyjnym i dwa pliki, które prawie nie różnią się wielkością.

Spróbujmy zrozumieć znaczenie tych plików zaglądając do historii zmian w naszej tabeli. 
Koniecznie przestudiuj kolumnę `operationMetrics`

In [14]:
df = spark.sql("describe history customers")

                                                                                

In [15]:
df.toPandas()

Unnamed: 0,version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
0,2,2023-12-15 09:20:16.468,,,UPDATE,{'predicate': '(cast(id#1176 as int) < 50)'},,,,1.0,Serializable,False,"{'numRemovedBytes': '4473', 'numAddedFiles': '...",,Apache-Spark/3.3.2 Delta-Lake/2.3.0
1,1,2023-12-15 09:18:35.540,,,WRITE,"{'mode': 'Append', 'partitionBy': '[]'}",,,,0.0,Serializable,True,"{'numOutputRows': '32', 'numOutputBytes': '447...",,Apache-Spark/3.3.2 Delta-Lake/2.3.0
2,0,2023-12-15 09:15:09.288,,,CREATE TABLE,"{'description': None, 'partitionBy': '[]', 'pr...",,,,,Serializable,True,{},,Apache-Spark/3.3.2 Delta-Lake/2.3.0


Poniższe zapytanie dostarcza danych o klientach, 
którzy pojawili się jako nowi, lub zmienili swoje dane w styczniu 2021.

In [20]:
df = spark.sql("""
select id, name, address, zipcode, city, country, effectiveDate
from  (
    select id, name, address, zipcode, city, country, effectiveDate, 
           rank() over (partition by id order by to_date(effectiveDate,"dd-MM-yyyy") desc) as version
    from   source_data
    where  to_date(effectiveDate,"dd-MM-yyyy") >= date "2021-01-01"
    and    to_date(effectiveDate,"dd-MM-yyyy") < date "2021-02-01"
    )
where version = 1""")

In [21]:
df.toPandas()

Unnamed: 0,id,name,address,zipcode,city,country,effectiveDate
0,100,Vance Palmer,"P.O. Box 221, 1718 Sociis Rd.",525734,Camarones,Poland,23-01-2021
1,14,Pearl Ward,401-3122 Aliquam Av.,4449,Elbistan,Spain,02-01-2021
2,21,Desirae Morin,Ap #675-9646 Ridiculus Avenue,22382,Bergen op Zoom,New Zealand,30-01-2021
3,27,Jerome Hines,598-974 Convallis Av.,10783,Tharparkar,Colombia,15-01-2021
4,29,Darius Cole,Ap #412-3424 Eu St.,12178,Birecik,New Zealand,30-01-2021
5,4,Robin Hartman,960-7120 Lectus Rd.,833082,Fundación,Peru,26-01-2021
6,44,Kasimir Irwin,156-1322 Nulla. Road,50218,Hồ Chí Minh City,Canada,30-01-2021
7,49,Justin Burch,806-9586 Quis Rd.,83477-576,Sokoto,Sweden,06-01-2021
8,59,Coby Blackwell,311-203 Ipsum St.,249414,Belfast,Mexico,13-01-2021
9,6,Jaime Dillon,8224 Amet Road,53604,Korneuburg,Peru,17-01-2021


### Zadanie 4

Chcemy zaktualizować dane naszych klientów w oparciu o ich styczniowe wersje. Zrobimy to w dwóch krokach

1. Usuniemy z tabeli `customers` dane już nieaktualne, a następnie 
2. Wstawimy do niej dane zgodne ze styczniowymi zmianami

Usuń z tabeli `customers` tych klientów, którzy zmienili swoje dane w styczniu 2021. Identyfikacja klientów odbywać się powinna za każdym razem w oparciu o atrybut `id`.

Jeśli okaże się, że polecenie `DELETE` nie wspiera podzapytań, skorzystaj z interfejsu w Scali,
lub za pomocą poniższego kodu, w dodatkowym paragrafie uzyskaj identyfikatory klientów do usunięcia, a następnie wkomponuj uzyskaną wartość w polecenie usuwające klientów.

```python
usun_ids_df = spark.sql("""
    SELECT id AS usun
    FROM (
        SELECT
            id, name, address, zipcode, city, country, effectiveDate,
            RANK() OVER (PARTITION BY id ORDER BY to_date(effectiveDate, 'dd-MM-yyyy') DESC) AS version
        FROM source_data
        WHERE to_date(effectiveDate, 'dd-MM-yyyy') >= '2021-01-01'
            AND to_date(effectiveDate, 'dd-MM-yyyy') < '2021-02-01'
    ) tab
    WHERE version = 1
""")

# Pobranie wyników jako listy
usun_ids_list = usun_ids_df.select("usun").rdd.flatMap(lambda x: x).collect()

# Konwersja do ciągu znaków
usun_ids_str = ",".join(map(str, usun_ids_list))
```

### Rozwiązanie zadania 4 - dodatkowy paragraf

In [22]:
usun_ids_df = spark.sql("""
    SELECT id AS usun
    FROM (
        SELECT
            id, name, address, zipcode, city, country, effectiveDate,
            RANK() OVER (PARTITION BY id ORDER BY to_date(effectiveDate, 'dd-MM-yyyy') DESC) AS version
        FROM source_data
        WHERE to_date(effectiveDate, 'dd-MM-yyyy') >= '2021-01-01'
            AND to_date(effectiveDate, 'dd-MM-yyyy') < '2021-02-01'
    ) tab
    WHERE version = 1
""")

# Pobranie wyników jako listy
usun_ids_list = usun_ids_df.select("usun").rdd.flatMap(lambda x: x).collect()

# Konwersja do ciągu znaków
usun_ids_str = ",".join(map(str, usun_ids_list))

                                                                                

In [23]:
usun_ids_str

'100,14,21,27,29,4,44,49,59,6,63,69,77,8,84'

### Rozwiązanie zadania 4

In [25]:
# Wykonanie zapytania SQL do usunięcia rekordów

spark.sql(f"""
DELETE FROM customers
where id IN ({usun_ids_str})
""")

                                                                                

DataFrame[num_affected_rows: bigint]

Za pomocą poniższego polecenia wstaw do tabeli `customers` dane klientów, 
którzy zmienili swoje dane w styczniu 2021.

In [26]:
spark.sql("""
INSERT INTO customers 
select id, name, address, zipcode, city, country, effectiveDate
from  (
    select id, name, address, zipcode, city, country, effectiveDate, 
           rank() over (partition by id order by to_date(effectiveDate,"dd-MM-yyyy") desc) as version
    from   source_data
    where  to_date(effectiveDate,"dd-MM-yyyy") >= date "2021-01-01"
    and    to_date(effectiveDate,"dd-MM-yyyy") < date "2021-02-01"
    )
where version = 1""")

                                                                                

DataFrame[]

Zanim przejdziemy dalej, ponownie zastanówmy się nad tym co działo się pod spodem. 

Spróbuj odpowiedzieć na dwa pytania:

1. Ile nowych plików przybyło po naszych dwóch operacjach `delete` i `insert`.
2. Ile plików będzie aktywnych - wykorzystywanych przy zapytaniach?

Znasz odpowiedzi? Sprawdźmy czy są one prawidłowe.

Wykonaj poniższe polecenia

In [27]:
%%sh
hadoop fs -ls /tmp/delta-customers

Found 5 items
drwxr-xr-x   - root hadoop          0 2023-12-15 09:31 /tmp/delta-customers/_delta_log
-rw-r--r--   2 root hadoop       3218 2023-12-15 09:30 /tmp/delta-customers/part-00000-320c6387-dd24-4a29-919b-d51de3e4c2e4-c000.snappy.parquet
-rw-r--r--   2 root hadoop       4473 2023-12-15 09:18 /tmp/delta-customers/part-00000-543056d5-009b-429a-b7cb-ceb7abbea198-c000.snappy.parquet
-rw-r--r--   2 root hadoop       4424 2023-12-15 09:20 /tmp/delta-customers/part-00000-c9a522ce-35d9-44e4-8996-a8317ea9a4e3-c000.snappy.parquet
-rw-r--r--   2 root hadoop       4433 2023-12-15 09:30 /tmp/delta-customers/part-00000-f4f0e584-f259-43eb-b5b8-e1eaff7c5006-c000.snappy.parquet


In [28]:
df = spark.sql("describe history customers")

                                                                                

In [29]:
df.toPandas()

Unnamed: 0,version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
0,4,2023-12-15 09:31:01.423,,,WRITE,"{'mode': 'Append', 'partitionBy': '[]'}",,,,3.0,Serializable,True,"{'numOutputRows': '15', 'numOutputBytes': '321...",,Apache-Spark/3.3.2 Delta-Lake/2.3.0
1,3,2023-12-15 09:30:19.830,,,DELETE,"{'predicate': '[""(spark_catalog.default.custom...",,,,2.0,Serializable,False,"{'numRemovedBytes': '4424', 'numAddedFiles': '...",,Apache-Spark/3.3.2 Delta-Lake/2.3.0
2,2,2023-12-15 09:20:16.468,,,UPDATE,{'predicate': '(cast(id#1176 as int) < 50)'},,,,1.0,Serializable,False,"{'numRemovedBytes': '4473', 'numAddedFiles': '...",,Apache-Spark/3.3.2 Delta-Lake/2.3.0
3,1,2023-12-15 09:18:35.540,,,WRITE,"{'mode': 'Append', 'partitionBy': '[]'}",,,,0.0,Serializable,True,"{'numOutputRows': '32', 'numOutputBytes': '447...",,Apache-Spark/3.3.2 Delta-Lake/2.3.0
4,0,2023-12-15 09:15:09.288,,,CREATE TABLE,"{'description': None, 'partitionBy': '[]', 'pr...",,,,,Serializable,True,{},,Apache-Spark/3.3.2 Delta-Lake/2.3.0


Powyższa kombinacja poleceń `delete` oraz `insert` w prosty sposób 
może zostać zastąpiona poleceniem `merge`.

### Zadanie 5

Korzystając z jednego polecenia `merge` jednocześnie
*   wstaw nowych klientów, którzy pojawili się po raz pierwszy w lutym 2021
*   zaktualizuj dane adresowe oraz `effectiveDate` tych klientów, którzy w tym samym czasie te dane zmienili. 

Poniższe polecenie uzyskuje dane z obu grup klientów:

```python
select id, name, address, zipcode, city, country, effectiveDate
from  (
    select id, name, address, zipcode, city, country, effectiveDate, 
           rank() over (partition by id order by to_date(effectiveDate,"dd-MM-yyyy") desc) as version
    from   source_data
    where  to_date(effectiveDate,"dd-MM-yyyy") >= date "2021-02-01"
    and    to_date(effectiveDate,"dd-MM-yyyy") < date "2021-03-01"
    )
where version = 1
```

### Rozwiązanie zadania 5

In [31]:
spark.sql("""
merge INTO customers
USING (select id, name, address, zipcode, city, country, effectiveDate
from  (
    select id, name, address, zipcode, city, country, effectiveDate, 
           rank() over (partition by id order by to_date(effectiveDate,"dd-MM-yyyy") desc) as version
    from   source_data
    where  to_date(effectiveDate,"dd-MM-yyyy") >= date "2021-02-01"
    and    to_date(effectiveDate,"dd-MM-yyyy") < date "2021-03-01"
    )
where version = 1) new_customers
ON customers.id = new_customers.id
WHEN matched
 THEN update
 SET customers.address = new_customers.address,
             customers.zipcode = new_customers.zipcode, 
             customers.city    = new_customers.city, 
             customers.country = new_customers.country,
             customers.effectiveDate = new_customers.effectiveDate
WHEN not matched
  THEN insert
 (id, name, address, zipcode, city, country, effectiveDate) 
values (id, name, address, zipcode, city, country, effectiveDate)
""")

23/12/15 09:46:43 WARN HintErrorLogger: Hint (strategy=broadcast) is not supported in the query: build left for full outer join.
23/12/15 09:46:43 WARN HintErrorLogger: Hint (strategy=broadcast) is not supported in the query: build left for full outer join.
23/12/15 09:46:44 WARN HintErrorLogger: Hint (strategy=broadcast) is not supported in the query: build left for full outer join.
                                                                                

DataFrame[num_affected_rows: bigint, num_updated_rows: bigint, num_deleted_rows: bigint, num_inserted_rows: bigint]

Za pomocą poniższego polecenia sprawdź, z których miesięcy pochodzą dane naszych klientów.

In [32]:
df = spark.sql("""
select substr(effectiveDate,4,10) as month,
       count(*) as how_many
from customers
group by substr(effectiveDate,4,10)
order by to_date(substr(effectiveDate,4,10),"MM-yyyy")""")

In [33]:
df.toPandas()

                                                                                

Unnamed: 0,month,how_many
0,10-2020,6
1,11-2020,12
2,12-2020,7
3,01-2021,15
4,02-2021,8


## Struktury plików, transakcje

Przed chwilą wykonaliśmy szereg operacji DML na naszej tabeli *customers*. 
Każda z nich, z jednej strony była oddzielną transakcją zapisaną w logach Delta Lake, z drugiej strony każda z nich dokonała pewnych zmian w plikach naszej tabeli.  

Za pomocą kilku kolejnych paragrafów rozglądniemy się na początku po strukturze plików, potem postaramy się wydobyć informacje na temat naszych transakcji.

In [34]:
# w kolumnie location znajdziesz katalog, w którym nasza tabela jest przechowywana
df = spark.sql("describe detail customers")

In [35]:
df.toPandas()

Unnamed: 0,format,id,name,description,location,createdAt,lastModified,partitionColumns,numFiles,sizeInBytes,properties,minReaderVersion,minWriterVersion,tableFeatures
0,delta,0483707e-4829-4fe6-b9be-c38cea8eea17,default.customers,,hdfs://pbd-cluster-m/tmp/delta-customers,2023-12-15 09:15:09.166,2023-12-15 09:46:45.373,[],2,7749,{},1,2,"[appendOnly, invariants]"


Sprawdź ile plików znajduje się w tym katalogu

Sprawdź ile plików ma najstarsze daty - powstało gdy po raz pierwszy załadowaliśmy do tabeli dane

W tym celu wykonaj poniższe polecenia:

In [36]:
%%sh 
hadoop fs -ls -t /tmp/delta-customers

Found 6 items
drwxr-xr-x   - root hadoop          0 2023-12-15 09:46 /tmp/delta-customers/_delta_log
-rw-r--r--   2 root hadoop       4531 2023-12-15 09:46 /tmp/delta-customers/part-00000-61197ba9-6b99-4f72-a36b-07c749f0e2e6-c000.snappy.parquet
-rw-r--r--   2 root hadoop       3218 2023-12-15 09:30 /tmp/delta-customers/part-00000-320c6387-dd24-4a29-919b-d51de3e4c2e4-c000.snappy.parquet
-rw-r--r--   2 root hadoop       4433 2023-12-15 09:30 /tmp/delta-customers/part-00000-f4f0e584-f259-43eb-b5b8-e1eaff7c5006-c000.snappy.parquet
-rw-r--r--   2 root hadoop       4424 2023-12-15 09:20 /tmp/delta-customers/part-00000-c9a522ce-35d9-44e4-8996-a8317ea9a4e3-c000.snappy.parquet
-rw-r--r--   2 root hadoop       4473 2023-12-15 09:18 /tmp/delta-customers/part-00000-543056d5-009b-429a-b7cb-ceb7abbea198-c000.snappy.parquet


In [37]:
df = spark.sql("describe history customers")

In [38]:
df.toPandas()

Unnamed: 0,version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
0,5,2023-12-15 09:46:45.373,,,MERGE,"{'matchedPredicates': '[{""actionType"":""update""...",,,,4.0,Serializable,False,"{'numOutputRows': '33', 'numTargetBytesAdded':...",,Apache-Spark/3.3.2 Delta-Lake/2.3.0
1,4,2023-12-15 09:31:01.423,,,WRITE,"{'mode': 'Append', 'partitionBy': '[]'}",,,,3.0,Serializable,True,"{'numOutputRows': '15', 'numOutputBytes': '321...",,Apache-Spark/3.3.2 Delta-Lake/2.3.0
2,3,2023-12-15 09:30:19.830,,,DELETE,"{'predicate': '[""(spark_catalog.default.custom...",,,,2.0,Serializable,False,"{'numRemovedBytes': '4424', 'numAddedFiles': '...",,Apache-Spark/3.3.2 Delta-Lake/2.3.0
3,2,2023-12-15 09:20:16.468,,,UPDATE,{'predicate': '(cast(id#1176 as int) < 50)'},,,,1.0,Serializable,False,"{'numRemovedBytes': '4473', 'numAddedFiles': '...",,Apache-Spark/3.3.2 Delta-Lake/2.3.0
4,1,2023-12-15 09:18:35.540,,,WRITE,"{'mode': 'Append', 'partitionBy': '[]'}",,,,0.0,Serializable,True,"{'numOutputRows': '32', 'numOutputBytes': '447...",,Apache-Spark/3.3.2 Delta-Lake/2.3.0
5,0,2023-12-15 09:15:09.288,,,CREATE TABLE,"{'description': None, 'partitionBy': '[]', 'pr...",,,,,Serializable,True,{},,Apache-Spark/3.3.2 Delta-Lake/2.3.0


In [39]:
from delta.tables import DeltaTable

# Inicjalizacja ścieżki do tabeli Delta
delta_path = "/tmp/delta-customers"
cust_delta_table = DeltaTable.forPath(spark, delta_path)

# Pobranie pełnej historii tabeli Delta
full_hist_df = cust_delta_table.history().select(
    "version", "timestamp", "operation", "isBlindAppend", "operationMetrics"
)

In [40]:
full_hist_df.toPandas()

Unnamed: 0,version,timestamp,operation,isBlindAppend,operationMetrics
0,5,2023-12-15 09:46:45.373,MERGE,False,"{'numOutputRows': '33', 'numTargetBytesAdded':..."
1,4,2023-12-15 09:31:01.423,WRITE,True,"{'numOutputRows': '15', 'numOutputBytes': '321..."
2,3,2023-12-15 09:30:19.830,DELETE,False,"{'numRemovedBytes': '4424', 'numAddedFiles': '..."
3,2,2023-12-15 09:20:16.468,UPDATE,False,"{'numRemovedBytes': '4473', 'numAddedFiles': '..."
4,1,2023-12-15 09:18:35.540,WRITE,True,"{'numOutputRows': '32', 'numOutputBytes': '447..."
5,0,2023-12-15 09:15:09.288,CREATE TABLE,True,{}


**Kilka pytań**

Zwróć uwagę, że niektóre z tych operacji mają flagę `isBlindAppend` zapaloną. **Które to były operacje?**

Każda zmiana - polecenia: `update`, `delete` czy `merge` "usuwała" dużą część (być może nawet wszystkie) aktywnych plików zastępując je nowymi. 

Co mogłoby spowodować, że liczba zastępowanych plików nie obejmowałaby wszystkich aktywnych plików?

# Podróż w czasie

Delta Lake to nie tylko wydajna obsługa (i możliwość wykonania) operacji DML, 
to także łatwość w odzyskiwaniu zniszczonych danych.

## Podstawy - czas i numery wersji

Na początku trochę podstaw. 

Sprawdź czas wykonania Twojej operacji `update`, która zmieniła kraj niektórym klientom na `Poland`.

In [41]:
from delta.tables import DeltaTable
from pyspark.sql import functions as F

# Inicjalizacja ścieżki do tabeli Delta
delta_path = "/tmp/delta-customers"
delta_table = DeltaTable.forPath(spark, delta_path)

# Pobranie historii operacji UPDATE
df = delta_table.history().where(F.col("operation") == "UPDATE").select("timestamp", "version")

In [42]:
df.toPandas()

Unnamed: 0,timestamp,version
0,2023-12-15 09:20:16.468,2


Skorzystaj teraz z tej daty, aby w poniższym paragrafie uzyskać dane jakie obowiązywały przed tą datą. 

In [48]:
df = (
    spark.read
    .format("delta")
    .option("timestampAsOf", "2023-12-15 09:20:16.467")
    .load("/tmp/delta-customers")
    .where(col("country").like("P%"))
)

In [49]:
df.toPandas()

                                                                                

Unnamed: 0,id,name,address,zipcode,city,country,effectiveDate
0,20,Kathleen Pugh,7018 Cras St.,3123,Ostrowiec Świętokrzyski,Peru,16-11-2020
1,28,Shaine Puckett,Ap #579-2185 Sed Street,85629,Cochrane,Poland,07-12-2020
2,68,Clarke Carlson,2296 Vestibulum St.,163826,Logan City,Poland,28-10-2020
3,74,Vernon Casey,650-5308 Felis Rd.,42987,Mirpur,Pakistan,14-11-2020


In [50]:
# to samo jest możliwe za pomocą numeru wersji -- popraw go na właściwy
df = (
    spark.read
    .format("delta")
    .option("versionAsOf", "1")
    .load("/tmp/delta-customers")
    .where(col("country").like("P%"))
)

In [51]:
df.toPandas()

                                                                                

Unnamed: 0,id,name,address,zipcode,city,country,effectiveDate
0,20,Kathleen Pugh,7018 Cras St.,3123,Ostrowiec Świętokrzyski,Peru,16-11-2020
1,28,Shaine Puckett,Ap #579-2185 Sed Street,85629,Cochrane,Poland,07-12-2020
2,68,Clarke Carlson,2296 Vestibulum St.,163826,Logan City,Poland,28-10-2020
3,74,Vernon Casey,650-5308 Felis Rd.,42987,Mirpur,Pakistan,14-11-2020


Skorzystaj z tej daty ponownie w poniższym paragrafie, 
aby tym razem uzyskać dane jakie obowiązywały po modyfikacji.

In [52]:
df = (
    spark.read
    .format("delta")
    .option("timestampAsOf", "2023-12-15 09:20:16.468")
    .load("/tmp/delta-customers")
    .where(col("country").like("P%"))
)

In [53]:
df.toPandas()

                                                                                

Unnamed: 0,id,name,address,zipcode,city,country,effectiveDate
0,10,Jin Terry,467-8297 Enim,35633573,Balıkesir,Poland,06-11-2020
1,11,Isabelle Stevenson,131-4245 Eleifend. Street,16142,Hà Giang,Poland,25-11-2020
2,14,Leo Mcleod,467-8297 Enim,39153,Borås,Poland,16-10-2020
3,16,Kaitlin Landry,623-5682 Augue St.,351225,Libramont-Chevigny,Poland,29-10-2020
4,19,Alden Harper,Ap #579-2185 Sed Street,94671-72608,Châtellerault,Poland,06-12-2020
5,2,Brandon Christian,476-5064 Suspendisse Rd.,93-765,Broxburn,Poland,28-11-2020
6,20,Kathleen Pugh,7018 Cras St.,3123,Ostrowiec Świętokrzyski,Poland,16-11-2020
7,26,Ulysses Dillard,1318 Tempor Rd.,S5J 6Z2,Tuscaloosa,Poland,11-10-2020
8,28,Shaine Puckett,Ap #579-2185 Sed Street,85629,Cochrane,Poland,07-12-2020
9,33,Alexander Becker,Ap #631-7469 Curae St.,29941,Anseong,Poland,04-11-2020


In [54]:
# to samo jest możliwe za pomocą numeru wersji -- popraw go na właściwy
df = (
    spark.read
    .format("delta")
    .option("versionAsOf", "2")
    .load("/tmp/delta-customers")
    .where(col("country").like("P%"))
)

In [55]:
df.toPandas()

                                                                                

Unnamed: 0,id,name,address,zipcode,city,country,effectiveDate
0,10,Jin Terry,467-8297 Enim,35633573,Balıkesir,Poland,06-11-2020
1,11,Isabelle Stevenson,131-4245 Eleifend. Street,16142,Hà Giang,Poland,25-11-2020
2,14,Leo Mcleod,467-8297 Enim,39153,Borås,Poland,16-10-2020
3,16,Kaitlin Landry,623-5682 Augue St.,351225,Libramont-Chevigny,Poland,29-10-2020
4,19,Alden Harper,Ap #579-2185 Sed Street,94671-72608,Châtellerault,Poland,06-12-2020
5,2,Brandon Christian,476-5064 Suspendisse Rd.,93-765,Broxburn,Poland,28-11-2020
6,20,Kathleen Pugh,7018 Cras St.,3123,Ostrowiec Świętokrzyski,Poland,16-11-2020
7,26,Ulysses Dillard,1318 Tempor Rd.,S5J 6Z2,Tuscaloosa,Poland,11-10-2020
8,28,Shaine Puckett,Ap #579-2185 Sed Street,85629,Cochrane,Poland,07-12-2020
9,33,Alexander Becker,Ap #631-7469 Curae St.,29941,Anseong,Poland,04-11-2020


## Rollback

### Zadanie 6

Wyobraź sobie, że ta operacja zmiany kraju na wartość `Poland` okazała się być błędna. 
Twoim zadaniem jest przywrócić wartości, które zostały nadpisane przez tą modifikację. 
Nie naprawiaj danych jeśli pojawiły się późniejsze (po Twoim poleceniu `update`) aktualizacje adresu (skorzystaj z `effectiveDate`).
Świetnie do takiej naprawy może się przydać operacja `merge`. Tym razem będzie ona miała tylko sekcję `WHEN MATCHED THEN`.

Oczywiście nie korzystaj z danych źródłowych - ich już nie ma. Jest tylko Twoja tabela Delta Lake. 

Skorzystaj z DataFrame API.

In [56]:
# błędne dane nie tylko są w historii, one są nadal w bieżącej postaci naszych danych
df = spark.sql("""
select * 
from customers 
where country like 'P%' 
  and to_date(effectiveDate,'dd-MM-yyyy') < date '2021-01-01'""")

In [57]:
df.toPandas()

                                                                                

Unnamed: 0,id,name,address,zipcode,city,country,effectiveDate
0,10,Jin Terry,467-8297 Enim,35633573,Balıkesir,Poland,06-11-2020
1,11,Isabelle Stevenson,131-4245 Eleifend. Street,16142,Hà Giang,Poland,25-11-2020
2,16,Kaitlin Landry,623-5682 Augue St.,351225,Libramont-Chevigny,Poland,29-10-2020
3,19,Alden Harper,Ap #579-2185 Sed Street,94671-72608,Châtellerault,Poland,06-12-2020
4,2,Brandon Christian,476-5064 Suspendisse Rd.,93-765,Broxburn,Poland,28-11-2020
5,20,Kathleen Pugh,7018 Cras St.,3123,Ostrowiec Świętokrzyski,Poland,16-11-2020
6,26,Ulysses Dillard,1318 Tempor Rd.,S5J 6Z2,Tuscaloosa,Poland,11-10-2020
7,28,Shaine Puckett,Ap #579-2185 Sed Street,85629,Cochrane,Poland,07-12-2020
8,33,Alexander Becker,Ap #631-7469 Curae St.,29941,Anseong,Poland,04-11-2020
9,41,Plato Vaughan,Ap #502-453 Non Rd.,06255-18554,Zoerle-Parwijs,Poland,25-11-2020


### Rozwiązanie zadania 6

In [60]:
from delta.tables import DeltaTable

# Inicjalizacja ścieżki do tabeli Delta
delta_path = "/tmp/delta-customers"
delta_table = DeltaTable.forPath(spark, delta_path)

# Wczytanie poprawnych danych z tabeli Delta na podstawie numeru wersji
poprawne_df = ( spark.read \
.format("delta") \
.option("versionAsOf", "1") \
.load(delta_path) \
)

# Wykonanie operacji MERGE na tabeli Delta
delta_table.alias("old") \
.marge(poprawne_df.alias("new"), "old.id = new.id and to_date(old.effectiveDate,'dd-MM-yyyy') < date '2021-01-01'") \
.whenMatchedUpdate(set = { "country" : "new.country"}) \
.execute()

AttributeError: 'DeltaTable' object has no attribute 'marge'

Sprawdź czy operacja przywracania poprzedniej wersji danych się powiodła. 

In [None]:
# błędne dane nie tylko są w historii, one są nadal w bieżącej postaci naszych danych
df = spark.sql("""
select * 
from customers 
where country like 'P%' 
  and to_date(effectiveDate,'dd-MM-yyyy') < date '2021-01-01'""")

In [None]:
df.toPandas()

Oczywiście należy pamiętać, że powyższa funkcjonalność to nie podróż w czasie w dowolnym zakresie. 

Nieaktualne pliki są sukcesywnie usuwane. 

Niemniej, prosta możliwość naprawy popełnionego właśnie przed chwilą błędu jest nieoceniona w świecie Big Data. To dlatego świat Big Data z zasady nie pozwala niczego modyfikować, a tworzy jedynie nowe na podstawie starego. W przypadku problemów stare, może zostać wykorzystane. 

Dokładnie to dzieje się pod maską w Delta Lake.

# Zmiany schematu

Zmiana zawartości danych to jedno. Ale świat zmienia się znacznie bardziej. Wymaga to często zmian definicji struktur naszych danych. 

Przykładowo, nasz atrybut `effectiveDate` jest ciągiem znaków, ale od dziś dobrze aby był datą. 
To samo dotyczy `id`. Od teraz wolimy, aby był liczbą. 

Tabele Delta Lake są przygotowane na takie modyfikacje. Wynika to poniekąd z formatu plików jaki jest wykorzystywany pod spodem. 

## Walidacja schematu 

Po pierwsze, Delta Lake dokonuje walidacji danych podczas wstawiania nowych danych. 

In [61]:
# przed zmianami
df = spark.sql("describe customers")

In [62]:
df.toPandas()

Unnamed: 0,col_name,data_type,comment
0,id,string,
1,name,string,
2,address,string,
3,zipcode,string,
4,city,string,
5,country,string,
6,effectiveDate,string,
7,,,
8,# Partitioning,,
9,Not partitioned,,


Spróbuj wstawić dane z marca 2021, uzupełnione o dodatkową kolumnę `endDate`.
```python
select id, 
       name, address, zipcode, city, country, 
       effectiveDate, 
       cast(null as date) as endDate
from  (
    select id, name, address, zipcode, city, country, effectiveDate,
           rank() over (partition by id order by to_date(effectiveDate,"dd-MM-yyyy") desc) as version
    from   source_data
    where  to_date(effectiveDate,"dd-MM-yyyy") >= date "2021-03-01"
    and    to_date(effectiveDate,"dd-MM-yyyy") < date "2021-04-01"
    )
where version = 1
```

In [63]:
spark.sql("""
INSERT INTO customers
select id, 
       name, address, zipcode, city, country, 
       effectiveDate, 
       cast(null as date) as endDate
from  (
    select id, name, address, zipcode, city, country, effectiveDate,
           rank() over (partition by id order by to_date(effectiveDate,"dd-MM-yyyy") desc) as version
    from   source_data
    where  to_date(effectiveDate,"dd-MM-yyyy") >= date "2021-03-01"
    and    to_date(effectiveDate,"dd-MM-yyyy") < date "2021-04-01"
    )
where version = 1""")

                                                                                

DataFrame[]

Dzięki parametrowi `spark.databricks.delta.schema.autoMerge.enabled=true` doszło do automatycznej integracji schematu docelowej tabeli z postacią danych, które były do niej wstawiane. 

Kolumna `endDate` jest nam potrzebna i będzie wykorzystywana w następnym zadaniu. 

W wersjach Delta Lake 2.2 i wcześniejszych należało wykorzystać poniższe rozwiązanie do uzyskania takiej integracji

```python
# Wybór danych z okresu od '2021-03-01' do '2021-04-01'
data_032021 = (
    source_data
    .filter((to_date("effectiveDate", "dd-MM-yyyy") >= '2021-03-01') & (to_date("effectiveDate", "dd-MM-yyyy") < '2021-04-01'))
    .withColumn("version", row_number().over(Window.partitionBy("id").orderBy(col("effectiveDate").desc())))
    .filter("version = 1")
    .withColumn("endDate", lit(None).cast("date"))
    .select("id", "name", "address", "zipcode", "city", "country", "effectiveDate", "endDate")
)

# Zapis danych do tabeli Delta
data_032021.write.option("mergeSchema", "true").format("delta").mode("append").saveAsTable("customers")
```


Sprawdź jak wygląda schemat Twojej tabeli po zmianach 

In [64]:
df = spark.sql("describe customers")

In [65]:
df.toPandas()

Unnamed: 0,col_name,data_type,comment
0,id,string,
1,name,string,
2,address,string,
3,zipcode,string,
4,city,string,
5,country,string,
6,effectiveDate,string,
7,endDate,date,
8,,,
9,# Partitioning,,


 
Niektóre zmiany schematu wymagają nadpisania całej zawartości tabeli 

`.mode("overwrite").option("overwriteSchema", "true")`. 

Źródłem danych może być oczywiście ta sama tabela, w tym również jej poprzednia wersja.

## Zadanie 7

Korzystając z tego mechanizmu zmień typy kolumn, o których wspominaliśmy.
Przy okazji wycofaj ostatnią aktualizację. Nie była przemyślana - dodaliśmy nowe dane (`append`) a powinniśmy uwzględnić fakt, że nie wszyscy klienci w nowym zestawie danych byli nowi, część z nich wymagała aktualizacji. W rezultacie pojawiły się duplikaty w naszych danych. 

Sprawdź, której wersji danych potrzebujesz.

In [66]:
from delta.tables import DeltaTable

# Inicjalizacja ścieżki do tabeli Delta
delta_path = "/tmp/delta-customers"
cust_delta_table = DeltaTable.forPath(spark, delta_path)

# Pobranie pełnej historii tabeli Delta
full_hist_df = cust_delta_table.history().select(
    "version", "timestamp", "operation", "isBlindAppend", "operationMetrics"
)

                                                                                

In [67]:
full_hist_df.toPandas()

Unnamed: 0,version,timestamp,operation,isBlindAppend,operationMetrics
0,6,2023-12-15 10:08:53.693,WRITE,True,"{'numOutputRows': '15', 'numOutputBytes': '343..."
1,5,2023-12-15 09:46:45.373,MERGE,False,"{'numOutputRows': '33', 'numTargetBytesAdded':..."
2,4,2023-12-15 09:31:01.423,WRITE,True,"{'numOutputRows': '15', 'numOutputBytes': '321..."
3,3,2023-12-15 09:30:19.830,DELETE,False,"{'numRemovedBytes': '4424', 'numAddedFiles': '..."
4,2,2023-12-15 09:20:16.468,UPDATE,False,"{'numRemovedBytes': '4473', 'numAddedFiles': '..."
5,1,2023-12-15 09:18:35.540,WRITE,True,"{'numOutputRows': '32', 'numOutputBytes': '447..."
6,0,2023-12-15 09:15:09.288,CREATE TABLE,True,{}


### Rozwiązanie zadania 7

In [71]:
from pyspark.sql.functions import to_date, col, lit

# Wczytanie danych z tabeli Delta na podstawie numeru wersji
customers_df = (
  spark.read.format("delta")
  .option("versionAsOf", "6")
  .table("customers")
  .withColumn("effectiveDate", to_date(col("effectiveDate"), "dd-MM-yyyy"))
  .withColumn("id", col("id").cast("int"))
  .withColumn("endDate", lit(None).cast("date")
))

# Zapis danych do tabeli Delta
(
    customers_df.write.format("delta")
    .mode("overwrite")
  .option("overwriteSchema", "true")
  .saveAsTable("customers")
)

                                                                                

In [73]:
# odczekaj chwilę aby zmiany doszły do skutku
df = spark.sql("describe customers")

In [None]:
df.toPandas()

Zmiany można także wymusić ręcznie. 
Dodaj jeszcze jedną kolumnę za pomocą poniższego paragrafu.

In [None]:
spark.sql("""
ALTER TABLE customers ADD COLUMNS (current boolean COMMENT 'if current data' AFTER country)
""")

In [None]:
df = spark.sql("describe customers")

In [None]:
df.toPandas()

# Finał 

Na zakończenie zaimplementujemy sposób utrzymywania tabel wymiarów jaki często wykorzystywany jest w hurtowniach danych.

## Slowly changing data (SCD) Type 2

Nasza tabela `customers` jest już na to gotowa. 
Kolumna `current` powinna być zapalona tylko dla najnowszych wersji danych. 
Kolumna `endDate` powinna mieć wartość zakończenia obowiązywania danej wersji danych jeśli pojawi się nowy wiersz z nową wersją danych. 



## Zadanie 8

**Zaimplementuj** funkcję `updateCustomers`, która na podstawie danych źródłowych z kolejnego *miesiąca* (parametr funkcji) będzie aktualizowała zawartość tabeli `customers` zgodnie a regułami ***SCD Type 2***.  

Po zakończonej implementacji **sprawdź jej działanie**. 

Jeśli uważasz, że obecne dane w tabeli customers powinny zostać poprawione, **dokonaj wcześniej stosownych korekt**. 

Jeśli chcesz skorzystaj ze strony:
https://docs.delta.io/latest/delta-update.html#-merge-in-scd-type-2



## Rozwiązanie zadania 8

In [None]:
# na początku korekty... raczej są potrzebne 


In [None]:
# Twoja funkcja


Masz to? 

Jeśli tak, to przyjmij gratulacje. Sprawdźmy jak to działa dla danych z marca, które wycofaliśmy. 

Na początku zobaczmy jakie to będą dane.

In [None]:
data_032021 = spark.sql("""
select id, 
       name, address, zipcode, city, country, 
       effectiveDate, 
       cast(null as date) as endDate
from  (
    select id, name, address, zipcode, city, country, effectiveDate,
           rank() over (partition by id order by to_date(effectiveDate,"dd-MM-yyyy") desc) as version
    from   source_data
    where  to_date(effectiveDate,"dd-MM-yyyy") >= date "2021-03-01"
    and    to_date(effectiveDate,"dd-MM-yyyy") < date "2021-04-01"
    )
where version = 1""")

In [None]:
data_032021.toPandas()

Zobaczmy ile z tych nowych wersji klientów istnieje już w naszych danych

In [None]:
from pyspark.sql.functions import col

# Wybór identyfikatorów z ramki danych data_032021
new_ids = data_032021.select("id").collect()
new_ids_list = [str(row.id) for row in new_ids]
new_ids_str = ",".join(new_ids_list)

# Wybór danych z tabeli customers, gdzie id znajduje się w new_ids_str
df = spark.sql(f"""
    SELECT *
    FROM customers
    WHERE id IN ({new_ids_str})
""")

In [None]:
df.toPandas()

Uruchom swoją funkcję

In [None]:
updateCustomers(data032021)

Sprawdźmy jak wyglądają stara i nowa wersja jednego ze zaktualizowanych klientów.

In [None]:
df = spark.sql("""
select * from customers
where id = 33 """)

In [None]:
df.toPandas()

Jeśli poprzednia wersja została zamknięta z odpowiednią datą a nowa z tą samą datą utworzona... 

to osiągneliśmy to o co nam chodziło... Delta Lake.