#### Explicació del script Silver to Gold
- Quin objectiu té la capa Gold en aquest cas?
  - L'objectiu principal de la capa Gold és transformar les dades emmagatzemades en la capa Silver en un format i una estructura optimitzats per a l'anàlisi i la presa de decisions. Aquesta capa proporciona les dades preparades per a la creació de gràfics, estadístiques i altres processos.

- Quins processos i accions realitzem en aquest script? (EXPLICACIÓ PAS A PAS)

  1- Lectura de les dades de la capa Silver:
    - Iniciem llegint les dades emmagatzemades en format Parquet a la capa Silver. Utilitzem aquestes dades per crear un DataFrame de Pandas.

  2- Selecció de les dades importants:
    - Creem diferents funcions per seleccionar les dades específiques que necessitem de la capa Silver. Això pot incloure la filtració de columnes, la transformació de les dades i altres manipulacions necessàries.

  3- Estructuració dels DataFrames:
    - Creem dos DataFrames amb l'estructura òptima per a les nostres necessitats. 

  4- Creació de taules a Azure SQL Server:
    - En cas que les taules necessàries encara no existeixin a la base de dades Azure SQL Server, les creem en aquest pas. És important tenir les taules preparades abans de penjar les dades per garantir que tinguin l'estructura correcta per a l'emmagatzematge a la capa Gold.

  5- Emmagatzematge de les dades a la capa Gold:
    - Finalment, emmagatzemem les dades transformades i seleccionades a la capa Gold, que en aquest cas és una base de dades Azure SQL Server. Això ens permet tenir accés a les dades en un format estructurat i optimitzat per a l'anàlisi i la generació de gràfiques.

Finalment, emmagatzemem les dades transformades i seleccionades a la capa Gold, que en aquest cas és una base de dades Azure SQL Server. Això ens permet tenir accés a les dades en un format estructurat i optimitzat per a l'anàlisi i la generació de coneixement i conclusions.

#### Importem les llibreries necessàries

In [0]:
import pandas as pd
from pyspark.sql.functions import col, to_json


#### Configuració connexió spark.
- En aquest codi podem veure les variables que necessitem per realitzar la connexió amb spark. I com creem l'objecta spark amb aquestes variables.
- La connexió en aquest cas ho fem amb SAS, ja que només ens fa falta un token per poder-la establir. Hi ha més maneras que és pot realitzar la connexió.

In [0]:
# Configuració Spark per connexió ADLG2 amb token SAS
token = 'sv=2022-11-02&ss=bfqt&srt=sco&sp=rwdlacupyx&se=2025-06-03T02:34:04Z&st=2024-04-09T18:34:04Z&spr=https&sig=dplYYYNSRC%2FjDR89WMxcsA6SMo%2BHFYrQ5BqdhRUKBMs%3D'
storage_account = 'projecteiabd'
container = 'silver'

spark.conf.set("fs.azure.account.auth.type.{0}.dfs.core.windows.net".format(storage_account), "SAS")
spark.conf.set("fs.azure.sas.token.provider.type.{0}.dfs.core.windows.net".format(storage_account), "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider")
spark.conf.set("fs.azure.sas.fixed.token.{0}.dfs.core.windows.net".format(storage_account), token)


#### Declaració de funcions.
A continuació declararem les funcións que utilitzarem per tractar i penjar les dades a la capa Gold.

- **Funció per llegir tots els fitxer ".parquet" de la partició que li assignem.**
  - Aquesta funció ens permet llegir els fitxers .parquet del Azure Data Lake de una particó en concret.
  - Ens retorna totes les dades dins de un Data Frame.

- **Funció per llegir tiquets d'un client concret**.
  - Aquesta funció ens permet llegir la informació dels tiquets del client que l'indiquem.
  - Retorna un Data Frame amb tota la informació dels tiquets del clients.

- **Funció per llegir tiquets d'un día concret**
  - Aquesta funció ens permet llegir la informació dels tiquets de un día concret.
  - Retorna un Data Frame amb tota la informació dels tiquets del día concret.

In [0]:
# Funció per llegir tots els fitxer ".parquet" de la partició que li assignem.

def read_all_silver(particio, cotainer, storage_account):
     docs = []

     # Li diguem la carpeta on tenim les particons. I ja els ha llegeix totes automàticament, no fa falta la funció recursiva.
     path = 'abfs://{0}@{1}.dfs.core.windows.net/{2}'.format(container, storage_account,particio)
     
     df = spark.read.parquet(path)
     
     return df




In [0]:
# Funció per llegir tiquets d'un client concret
def read_ticket_client(id_client, cotainer, storage_account):
     docs = []

     # Li diguem la carpeta on tenim les particons. I ja els ha llegeix totes automàticament, no fa falta la funció recursiva.
     path = 'abfs://{0}@{1}.dfs.core.windows.net/clients/id_client={2}'.format(container, storage_account, id_client)
     
     df = spark.read.parquet(path)
     
     return df

In [0]:
# # Cridem la funció anterior per mostrar un exemple del funcionament.
# df_tiquet = read_ticket_client('1', container, storage_account)

# # Convertim el resultat en un df de pandas.
# df_tiquet.toPandas() 

In [0]:
# Funció per llegir tiquets d'un día concret
def read_ticket_data(data, cotainer, storage_account):
     docs = []

     # Li diguem la carpeta on tenim les particons. I ja els ha llegeix totes automàticament, no fa falta la funció recursiva.
     path = 'abfs://{0}@{1}.dfs.core.windows.net/dies/data={2}'.format(container, storage_account, data)
     
     df = spark.read.parquet(path)
     
     return df

In [0]:
# Cridem la funció anterior per mostrar un exemple del funcionament.
df_tiquet_dies = read_ticket_data('22-04-2024', container, storage_account)

# Convertim el resultat en un df de pandas.
df_tiquet_dies.toPandas() 

Unnamed: 0,ciutat,factura_simplificada,products,total_amb_IVA,id_tiquet,id_client,data_scanner_tiquet
0,BADALONA,1234-123-12345,"[{'descripcio': 'LASANYA', 'import': '17.60', ...",1760,24052024_173502,1,24-05-2024



#### Tractament de dades per penjar a SQL SERVER
- En aquest tractarem el df, per penjar-ho a la capa Gold que és un Azure SQL Server. 
- Per fer-ho creearem dos df amb l'estructura i dades que volem tenir a la taula recional SQL.

In [0]:
# En aquest cas per passar a la capa gold(SQL) agafem totes les dades de una partició sense importar quina sigui ja que de aquesta forma penjarem totes les dades a la capa Gold. 
# Que es el que ens interessa poder mostrar estadístiques i gràfiques de totes les dades en aquest cas.

# Llegim tots els parquets, amb la funció per llegir-ho tot de totes les particións.
df_parquet = read_all_silver('clients', container, storage_account)

# Convertim el resultat en un df de pandas.
df_parquet = df_parquet.toPandas() 
df_parquet

Unnamed: 0,ciutat,data,factura_simplificada,products,total_amb_IVA,id_tiquet,data_scanner_tiquet,id_client
0,BADALONA,22-04-2024,1234-123-12345,"[{'descripcio': 'LASANYA', 'import': '17.60', ...",1760,24052024_173502,24-05-2024,1


In [0]:
# Per mostrar-ho a power bi crearem els següents df que posteriorment penjarem a sql.
# tiquets(id_tiquet(CP), id_client, total_amb_IVA, data_scanner_tiquet, ciutat)
# productes(name, qty, format, p_u_amb_iva, preu_amb_iva, id_tiquet (CF))
product_data = []
tiquets_data = []

# Itearem sobre cada fila del df.
for index, row in df_parquet.iterrows():
    # Creearem el df de tiquet posant les dades que ens interessent.
    tiquets_data.append({
        'id_tiquet': row['id_tiquet'],
        'id_client': row['id_client'],
        'total_amb_IVA': row['total_amb_IVA'],
        'data_scanner_tiquet': row['data_scanner_tiquet'],
        'data_tiquet': row['data'],
        'ciutat': row['ciutat'],
    })
    # Per cada fila iterem per els productes.
    for producte in row['products']:
        # Creem una nova fila per producte i li associem valors de la fila. Com l'id tiquet... Que ens servirant per el Power Bi.
        # Anem afegint les dades / files a la llista.
        product_data.append({
            'descripcio': producte['descripcio'],
            'quantitat': producte['quantitat'],
            'preu unitari': producte['preu_unitari'],
            'import': producte['import'],
            'unitat': producte['unitat'],
            'id_tiquet': row['id_tiquet']
        })

# Converitm l'array de objectes amb un dataframe.
df_productes_bi = pd.DataFrame(product_data)
df_tiquets_bi = pd.DataFrame(tiquets_data)

#### Creació de l'estructura SQL
Abans de penjar les dades a les taules SQL, és crucial tenir les taules creades a la nostra base de dades SQL.

**Com hem definit l'estructura?**
  - Hem optat per dividir les dades en dues taules per facilitar l'anàlisi i la visualització. Això ens permetrà realitzar estadístiques i gràfics amb més facilitat, ja que evita la duplicació de dades i ens permet incorporar més informació.
  - Hem separat els camps que pertanyen als tiquets en general i els camps que descriuen les línies de productes dels tiquets.
  - Les dues taules estan relacionades pel camp 'id_tiquet', que actua com a clau forana a la taula 'lin_productes'. Hem afegit una clau primària ('PK') a la taula 'lin_productes' per identificar de manera única les línies dels tiquets.

**ESTRUCTURA DE LA BASE DE DADES**

**Taula: tiquets**
- Camps:
  - ciutat
  - data_scanner_tiquet
  - data_tiquet
  - id_client
  - id_tiquet (PK)
  - total_amb_IVA


**Taula: lin_productes**
- Camps:
  - descripcio
  - id_tiquet (FK)
  - import
  - preu_unitari
  - quanitat
  - unitat


#### Penjem les dades al servidor Azure SQL Server
Un cop tenim el següent preparat:
  - Les taules amb l'estructura necessària al servidor SQL.
  - Els DataFrame (DF) creats que volem penjar a les taules amb la mateixa estructura, ja que sinó no ens permetrà fer-ho.

Per a això, ens connectarem al SQL Server mitjançant el JDBC de SQL. Utilitzarem les credencials i dades necessàries (que obtenim d'Azure).

Un cop establerta la connexió, només caldrà escriure als DataFrame corresponents les taules:
  - Primer, convertirem els DF de Pandas a DF de Spark.
  - A continuació, posarem el mode 'overwrite' per sobrescriure el contingut i evitar problemes amb claus primàries duplicades.

In [0]:
# Cadena de connexió JDBC (llibreria de connexió)
jdbc_url = "jdbc:sqlserver://sql-otorrent-etl.database.windows.net:1433;database=sqldb-otorrent-etl;"

# Propietats de connexió (Dades que utiltizem per realitzar les connexións)
connection_properties = {
    "user": "oriol@sql-otorrent-etl",
    "password": "41598051KKk",
    "encrypt": "true",
    "trustServerCertificate": "false",
    "hostNameInCertificate": "*.database.windows.net",
    "loginTimeout": "30"
}

# Panjem els dfs al sql server.
spark.createDataFrame(df_tiquets_bi).write.jdbc(url=jdbc_url, table="dbo.tiquets", mode="overwrite", properties=connection_properties)
spark.createDataFrame(df_productes_bi).write.jdbc(url=jdbc_url, table="dbo.lin_prodcutes", mode="overwrite", properties=connection_properties)