# Zadanie
Posiadamy dwa źródła danych: tabela (w tym wypadku plik CSV) oraz informacje od naszych partnerów za pomocą ich API. (w tym wypadku plik JSON)

## Wymagania:
1. Informacje są przedstawione w jednej strukturze.
2. Nie tracimy żadnych informacji.
3. Dane muszą być proste do aktualizacji oraz do tworzenia raportów rocznych i miesięcznych.
4. Nie potrzebujemy dokładnych informacji o kliencie, wystarczy "sygnatura".

In [0]:
display(
    spark.read.csv("/FileStore/example_table_data.csv", header=True)
)

display(
    spark.read.format("json").option("multiline", "True").load("/FileStore/example_api_data.json")
)

ID,first_name,last_name,payment_method,payment_date,amount
1,Michael,Hodges,Visa,2024-10-01,975.76
2,Edward,Ward,Visa,2024-08-29,946.25
3,Nicholas,Wright,American Express,2024-09-14,28.51
4,Ray,Carr,Cash,2024-08-08,189.66
5,Brittany,Miranda,American Express,2024-09-17,95.29
6,Kelly,Galloway,Mastercard,2024-10-08,834.16
7,Morgan,Erickson,Visa,2024-10-21,943.17
8,Glen,Mayer,PayPal,2024-09-29,977.32
9,Edgar,Trevino,Mastercard,2024-10-01,895.36
10,David,Carter,PayPal,2024-09-27,316.79


amount,id,payment_date,payment_method,user_name
126.33,1,2024-09-13,Google Pay,steven43
779.35,2,2024-09-21,Apple Pay,qsmith
917.78,3,2024-09-02,MasterCard,gbennett
59.06,4,2024-09-29,Visa,ralvarez
735.56,5,2024-08-11,MasterCard,benjamincarlson
787.46,6,2024-10-27,MasterCard,fischerbrandy
844.32,7,2024-11-02,Google Pay,francochristopher
521.09,8,2024-09-01,Wallet,angela06
974.05,9,2024-09-26,Apple Pay,emilylarsen
145.92,10,2024-08-21,Visa,ryancunningham


In [0]:
import pyspark.sql.functions as F
import pyspark.sql.types as T
df_table = spark.read.csv("/FileStore/example_table_data.csv", header=True)
df_api = spark.read.format("json").option("multiline", "True").load("/FileStore/example_api_data.json")

In [0]:
df_table = (
    df_table.select(
        F.concat(F.lit("T"), F.col("id")).alias("id"),
        F.concat_ws(" ", F.col("first_name"), F.col("last_name")).alias("sygnature"),
        F.col("payment_method"),
        F.col("amount").cast(T.DoubleType()),
        F.col("payment_date").cast(T.DateType()),
        F.substring(F.col("payment_date"), 1, 4).cast(T.IntegerType()).alias("year"),
        F.substring(F.col("payment_date"), 6, 2).cast(T.IntegerType()).alias("month"),
    )
)

In [0]:
df_api = (
    df_api.select(
        F.concat(F.lit("A"), F.col("id")).alias("id"),
        F.col("user_name").alias("sygnature"),
        F.col("payment_method"),
        F.col("amount"),
        F.col("payment_date").cast(T.DateType()),
        F.substring(F.col("payment_date"), 1, 4).cast(T.IntegerType()).alias("year"),
        F.substring(F.col("payment_date"), 6, 2).cast(T.IntegerType()).alias("month"),
    )
)

In [0]:
df_table.unionAll(df_api).write.mode("overwrite").partitionBy("year", "month").format("delta").save("/FileStore/result.delta")

In [0]:
dbutils.fs.ls("/FileStore/result.delta/")

[FileInfo(path='dbfs:/FileStore/result.delta/_delta_log/', name='_delta_log/', size=0, modificationTime=1730837347000),
 FileInfo(path='dbfs:/FileStore/result.delta/year=2024/', name='year=2024/', size=0, modificationTime=1730837350000)]