In [1]:
%run "/home/jovyan/work/workspace/functions/create_spark_session.ipynb"

Spark Version: 3.5.0


In [2]:
%run "/home/jovyan/work/workspace/functions/paths.ipynb"

In [3]:
%run "/home/jovyan/work/workspace/functions/dates.ipynb"

In [4]:
import sys
import subprocess

In [5]:
try:
    import holidays
    print("Lib 'holidays' ok!")
except ImportError:
    print("Instalando a lib holidays...")
    subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", "holidays"])
    import holidays
    print("Instalado com sucesso!")

Lib 'holidays' ok!


In [6]:
from pyspark.sql.functions import col, date_format, lit , to_date,trim,current_date, regexp_replace, weekday, day, weekofyear , when

from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, LongType, DateType,DecimalType,FloatType

In [7]:
df = spark.read.format('delta').load(f'{silver}sales')

In [8]:
df = (df.withColumn("tpgroup", 
      when(col("dsgroup").contains("MILITARY/GOVT"), "GOV & MILITARY")
     .when(col("dsgroup").isin("WORKPLACE", "HEALTHCARE", "TRANSPORTATION", "FOOD SERVICE"), "SERVICES")
     .when(col("dsgroup").isin("CLUBS", "LEISURE"), "ENTERTAINMENT")
     .when(col("dsgroup").isin("RETAIL COLD", "MASS MERCHANDISER", "SUPERS", "CONVENIENCE RETAIL", "DRUG STORES"), "GROCERY")
     .when(col("dsgroup").contains("EDUCATION"), "ACADEMIC")
     .otherwise("OTHER"))
     .withColumn("cdgroup", 
      when(col("tpgroup") == "GOV & MILITARY", 0)
     .when(col("tpgroup") == "SERVICES", 1)
     .when(col("tpgroup") == "ENTERTAINMENT", 2)
     .when(col("tpgroup") == "OTHER", 3)
     .when(col("tpgroup") == "GROCERY", 4)
     .when(col("tpgroup") == "ACADEMIC", 5))
     .select(['dtregister',
     'cdbrand',
     'dsbrand',
     'dsregion',
     'cdgroup',
     'tpgroup',
     'dsgroup',
     'dschannel',
     'dspack',
     'dscatpack',
     'dsnmpack',
     'qtvolume',
     'dsyear',
     'dsmonth',
     'dsquarter',
     'dtload']))

### Product Dim

In [9]:
df_dim_product = df.select('cdbrand','dsbrand','dscatpack','dsnmpack').distinct().withColumn('dtload', to_date(lit(f'{dtcarga}')))

### Location DIM

In [10]:
df_dim_location = df.select('dsregion').distinct().withColumn('dtload', to_date(lit(f'{dtcarga}')))

### Time Dim

In [11]:
years = df.select('dsyear').dropDuplicates().collect()[0][0]

In [12]:
years_holidays = holidays.US(years=years) + holidays.CA(years=years)

In [13]:
h_list = [(k, v) for k, v in years_holidays.items()]
h_schema = StructType([
        StructField("h_date", DateType(), True),
        StructField("holiday_name", StringType(), True)
    ])
df_holidays_ref = spark.createDataFrame(h_list, schema=h_schema).distinct()

In [14]:
df_dim_time = (
    df.selectExpr('dtregister','dsyear','dsmonth','dsquarter','weekday(dtregister) as dsdayofweek','day(dtregister) as dsday','weekofyear(dtregister) as dsweekofyear')
    .withColumn('nmdayofweek',
      when(weekday("dtregister") == 0, "Sunday")
     .when(weekday("dtregister") == 1, "Monday")
     .when(weekday("dtregister") == 2, "Tuesday")
     .when(weekday("dtregister") == 3, "Wednesday")
     .when(weekday("dtregister") == 4, "Thursday")
     .when(weekday("dtregister") == 5, "Friday")
     .when(weekday("dtregister") == 6, "Saturday"))
    .withColumn("nmmonth", 
      when(col('dsmonth') == 1, "January")
     .when(col('dsmonth') == 2, "February")
     .when(col('dsmonth') == 3, "March")
     .when(col('dsmonth') == 4, "April")
     .when(col('dsmonth') == 5, "May")
     .when(col('dsmonth') == 6, "June")
     .when(col('dsmonth') == 7, "July")
     .when(col('dsmonth') == 8, "August")
     .when(col('dsmonth') == 9, "September")
     .when(col('dsmonth') == 10, "October")
     .when(col('dsmonth') == 11, "November")
     .when(col('dsmonth') == 12, "December"))
    .alias('time').join(df_holidays_ref.alias('holidays'), col('time.dtregister') == col('holidays.h_date'), 'leftOuter')
    .drop('h_date')
    .withColumnRenamed('holiday_name', 'dsholiday')
    .withColumn('flholiday', when(col('dsholiday').isNull() == False, 1).otherwise(0))
    .select(['dtregister',
     'dsday',
     'dsdayofweek',
     'nmdayofweek',
     'dsweekofyear',             
     'dsmonth',
     'nmmonth',
     'dsyear',
     'dsquarter',
     'flholiday',
     'dsholiday',
     ])
).distinct().withColumn('dtload', to_date(lit(f'{dtcarga}')))


### Fact Table

In [15]:
df_fact = df.select(
[
'dtload',
'dtregister',
'cdbrand',
'dschannel',
'dsregion',
'qtvolume'
]
    
).na.drop(subset=['qtvolume', 'cdbrand'])

### Persist Gold

In [16]:
df_dim_location.na.drop().dropDuplicates().write.partitionBy("dtload").mode('overwrite').format('delta').save(f'{gold}sales_location')

In [17]:
df_dim_product.na.drop().dropDuplicates().write.partitionBy("dtload").mode('overwrite').format('delta').save(f'{gold}sales_product')

In [18]:
df_dim_time.na.drop().dropDuplicates().write.partitionBy("dtload").mode('overwrite').format('delta').save(f'{gold}sales_time')

In [19]:
df.na.drop().dropDuplicates().write.partitionBy("dtload").mode('overwrite').format('delta').save(f'{gold}sales')

In [20]:
df_fact.na.drop().dropDuplicates().write.partitionBy("dtload").mode('overwrite').format('delta').save(f'{gold}sales_fact')