In [None]:
%python
spark.conf.set("fs.azure.account.auth.type.stdesafioraizen.dfs.core.windows.net", "SAS")
spark.conf.set("fs.azure.sas.token.provider.type.stdesafioraizen.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider")
spark.conf.set("fs.azure.sas.fixed.token.stdesafioraizen.dfs.core.windows.net", "sv=2022-11-02&ss=bfqt&srt=sco&sp=rwdlacupyx&se=2024-11-23T06:46:05Z&st=2023-11-22T22:46:05Z&spr=https,http&sig=6A67WaiIgJOgEbqMtYdPVMuQeueKtpmyZ9k%2Fa2Pd77E%3D")

In [None]:
%python
import pathlib
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import when, regexp_replace, lit,format_string,current_timestamp,to_date

In [None]:
%python
def unpivot_table(path):
    df = spark.read.csv(path, inferSchema=True, header=True)
    list_columns = df.columns
    list_remove = ['Dados','product','unit','uf']
    list_columns = list(set(list_columns) - set(list_remove))
    
    for col in list_columns:
        table_name = f'dt_{col}'
        year = col.split('_')[-1]
        spark.catalog.dropTempView(table_name)
        
        df.createTempView(table_name)
        df2 = spark.sql(f"select Dados AS month,Dados AS month_num, uf, product, unit, {col} AS volume  from {table_name} where {col} IS NOT NULL")
        
        df2 =   (
                df2.withColumn('month_num', 
                             when(df2.month_num.endswith('JANEIRO'),regexp_replace(df2.month_num,'JANEIRO','01')) 
                            .when(df2.month_num.endswith('FEVEREIRO'),regexp_replace(df2.month_num,'FEVEREIRO','02')) 
                            .when(df2.month_num.endswith('MARÇO'),regexp_replace(df2.month_num,'MARÇO','03')) 
                            .when(df2.month_num.endswith('ABRIL'),regexp_replace(df2.month_num,'ABRIL','04')) 
                            .when(df2.month_num.endswith('MAIO'),regexp_replace(df2.month_num,'MAIO','05')) 
                            .when(df2.month_num.endswith('JUNHO'),regexp_replace(df2.month_num,'JUNHO','06')) 
                            .when(df2.month_num.endswith('JULHO'),regexp_replace(df2.month_num,'JULHO','07')) 
                            .when(df2.month_num.endswith('AGOSTO'),regexp_replace(df2.month_num,'AGOSTO','08')) 
                            .when(df2.month_num.endswith('SETEMBRO'),regexp_replace(df2.month_num,'SETEMBRO','09')) 
                            .when(df2.month_num.endswith('OUTUBRO'),regexp_replace(df2.month_num,'OUTUBRO','10')) 
                            .when(df2.month_num.endswith('NOVEMBRO'),regexp_replace(df2.month_num,'NOVEMBRO','11')) 
                            .when(df2.month_num.endswith('DEZEMBRO'),regexp_replace(df2.month_num,'DEZEMBRO','12')) 
                            .otherwise(df2.month_num)))

        df2 = df2.select(   '*',   
                            lit(year).alias("year"), 
                            current_timestamp().alias("created_at"))

        df2 = df2.select(   '*',to_date(
                                        format_string(  "%s/%s",
                                                        df2.year,
                                                        df2.month_num),
                                        "yyyy/MM")
                                .alias("year_month"))        

        df2 = df2.select('year_month','uf','product','unit','volume','created_at')
        
        try: 
            df_export = df_export.unionAll(df2)
        
        except:
            df_export = df2


    spark.catalog.dropTempView(table_name)

    return df_export

In [None]:
%python
def join_csv(list_files):

    
    for file in list_files:
        file = str(file)
        print(str(file))
        df = unpivot_table(file)

        try:
            df_export = df_export.unionAll(df)
        except:
            df_export = df

    return df_export

In [None]:
%python
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("ListFiles").getOrCreate()

path = f"abfs://desafioraizen@stdesafioraizen.dfs.core.windows.net/processing/"

file_list = spark.read.format("binaryFile").option("recursiveFileLookup", "true").load(path)

file_list.select("path").show(truncate=False)
file_paths_list = file_list.select("path").collect()

file_paths = [row.path for row in file_paths_list]

print(file_paths)


In [None]:
%python
df = join_csv(file_paths)

df.show(1000)

In [None]:
df.write.parquet("abfs://desafioraizen@stdesafioraizen.dfs.core.windows.net/compiled/", mode="overwrite", partitionBy=['product', 'uf'])