##Iterations by each folder

### Getting names of tables in Silver folder

In [0]:
table_name = []
for i in dbutils.fs.ls('mnt/silver/SalesLT/'):
    table_name.append(i.name.split('/')[0])
print(table_name)

['Address', 'Customer', 'CustomerAddress', 'Product', 'ProductCategory', 'ProductDescription', 'ProductModel', 'ProductModelProductDescription', 'SalesOrderDetail', 'SalesOrderHeader']


### Creating temp views of tables to execute preprocessings in SQL

In [0]:
for name in table_name:
    path = '/mnt/silver/SalesLT/'+name
    df = spark.read.format('delta').load(path)
    df.createOrReplaceTempView(name)
    df.show()
    """for old_col_name in df.columns:
        #lstrip hide _ in the beggining after the transformation
        new_col_name = "".join(["_" + char if char.isupper() and not old_col_name[i-1].isupper() else char for i, char in enumerate(old_col_name)]).lstrip("_")
        print(old_col_name)
        print(new_col_name)

        df = df.withColumnRenamed(old_col_name, new_col_name)

    output_path = '/mnt/gold/SalesLT/'+name+'/'
    df.write.format('delta').mode("overwrite").save(output_path)"""

+---------+--------------------+------------+-----------+----------------+-------------+----------+--------------------+------------+
|AddressID|        AddressLine1|AddressLine2|       City|   StateProvince|CountryRegion|PostalCode|             rowguid|ModifiedDate|
+---------+--------------------+------------+-----------+----------------+-------------+----------+--------------------+------------+
|        9|   8713 Yosemite Ct.|     unknown|    Bothell|      Washington|United States|     98011|268af621-76d7-4c7...|  2006-07-01|
|       11| 1318 Lasalle Street|     unknown|    Bothell|      Washington|United States|     98011|981b3303-aca2-49c...|  2007-04-01|
|       25|    9178 Jumping St.|     unknown|     Dallas|           Texas|United States|     75201|c8df3bd9-48f0-465...|  2006-09-01|
|       28|    9228 Via Del Sol|     unknown|    Phoenix|         Arizona|United States|     85004|12ae5ee1-fc3e-468...|  2005-09-01|
|       32|   26910 Indela Road|     unknown|   Montreal|     

### Creating Dim and Fact Tables

In [0]:
"""
SELECT row_number() OVER(ORDER BY P.ProductId) as ProductId, P.ProductId as ProductCod, P.Name, P.ProductNumber, PC1.Name Category, PC.Name SubCategory, PM.Name Model, P.Color, P.StandardCost, P.ListPrice FROM Product P    
    LEFT JOIN ProductCategory PC ON PC.ProductCategoryID = P.ProductCategoryID    
    LEFT JOIN ProductCategory PC1 ON PC.ParentProductCategoryID = PC1.ProductCategoryID    
    LEFT JOIN ProductModel PM ON PM.ProductModelID = P.ProductModelID
"""
dimProduct_df = spark.sql("SELECT row_number() OVER(ORDER BY P.ProductId) as ProductId, P.ProductId as ProductCod, P.Name, P.ProductNumber, PC1.Name Category, PC.Name SubCategory, PM.Name Model, P.Color, P.StandardCost, P.ListPrice FROM Product P    LEFT JOIN ProductCategory PC ON PC.ProductCategoryID = P.ProductCategoryID    LEFT JOIN ProductCategory PC1 ON PC.ParentProductCategoryID = PC1.ProductCategoryID    LEFT JOIN ProductModel PM ON PM.ProductModelID = P.ProductModelID")
dimProduct_df.createOrReplaceTempView("dimProduct")


In [0]:
"""
--query to create DimCustomer with their directions
SELECT row_number() OVER (ORDER BY C.CustomerID) as CustomerId, C.CustomerID as CustomerCod, concat(C.Title,' ', C.FirstName,' ', C.MiddleName,' ', C.LastName) Fullname, C.CompanyName, C.EmailAddress, C.Phone, CA.AddressType, A.AddressID as AddressCod, A.AddressLine1, A.City, A.StateProvince, A.CountryRegion, A.PostalCode FROM Customer C 
    LEFT JOIN CustomerAddress CA ON C.CustomerID = CA.CustomerID
    LEFT JOIN Address A ON A.AddressID = CA.AddressID
"""
dimCustomer_df = spark.sql("SELECT row_number() OVER (ORDER BY C.CustomerID) as CustomerId, C.CustomerID as CustomerCod, concat(C.Title,' ', C.FirstName,' ', C.MiddleName,' ', C.LastName) Fullname, C.CompanyName, C.EmailAddress, C.Phone, CA.AddressType, A.AddressID as AddressCod, A.AddressLine1, A.City, A.StateProvince, A.CountryRegion, A.PostalCode FROM Customer C     LEFT JOIN CustomerAddress CA ON C.CustomerID = CA.CustomerID    LEFT JOIN Address A ON A.AddressID = CA.AddressID")
dimCustomer_df.createOrReplaceTempView("dimCustomer")

In [0]:
"""
--query to create Fact Orders
SELECT row_number() OVER (ORDER BY SOH.SalesOrderID) SalesOrderId, SOH.SalesOrderID, SOH.OrderDate, SOH.DueDate, SOH.ShipDate, SOH.SalesOrderNumber, C.CustomerId, SOH.ShipMethod, SOH.SubTotal, SOH.TaxAmt, SOH.Freight, SOH.TotalDue, P.ProductID, SOD.OrderQty, SOD.UnitPrice, SOD.UnitPriceDiscount, SOD.LineTotal FROM SalesOrderHeader SOH 
    INNER JOIN SalesOrderDetail SOD ON SOD.SalesOrderID = SOH.SalesOrderID
    INNER JOIN dimProduct P ON P.ProductCod = SOD.ProductID
    INNEr JOIN dimCustomer C ON C.CustomerCod = SOH.CustomerID AND C.AddressCod = ShipToAddressID
"""
factOrders_df = spark.sql("SELECT row_number() OVER (ORDER BY SOH.SalesOrderID) SalesOrderId, SOH.SalesOrderID SalesOrderCod, SOH.OrderDate, SOH.DueDate, SOH.ShipDate, SOH.SalesOrderNumber, C.CustomerId, SOH.ShipMethod, SOH.SubTotal, SOH.TaxAmt, SOH.Freight, SOH.TotalDue, P.ProductID, SOD.OrderQty, SOD.UnitPrice, SOD.UnitPriceDiscount, SOD.LineTotal FROM SalesOrderHeader SOH     INNER JOIN SalesOrderDetail SOD ON SOD.SalesOrderID = SOH.SalesOrderID    INNER JOIN dimProduct P ON P.ProductCod = SOD.ProductID    INNEr JOIN dimCustomer C ON C.CustomerCod = SOH.CustomerID AND C.AddressCod = ShipToAddressID")
factOrders_df.createOrReplaceTempView("factOrders")

#### Logic for creating dimDate which use Scala logic

In [0]:

%scala
//Taken from
//https://www.linkedin.com/pulse/how-create-date-dimension-using-databricks-aakash-jain/
case class dim_date_schema(
                       date_key: Int,
                       date: String,
                       day: Int,
                       day_suffix: String,
                       week_day: Int,
                       week_day_name: String,
                       week_day_name_short: String,
                       week_day_name_first_letter: String,
                       day_of_year: Int,
                       week_of_month: Int,
                       week_of_year: Int,
                       month: Int,
                       month_name: String,
                       month_name_short: String,
                       month_name_first_letter: String,
                       quarter: Int,
                       quarter_name: String,
                       year: Int,
                       yyyymm: String,
                       month_year: String,
                       is_weekend: Int,
                       is_holiday: Int,
                       first_date_of_year: String,
                       last_date_of_year: String,
                       first_date_of_quarter: String,
                       last_date_of_quarter: String,
                       first_date_of_month: String,
                       last_date_of_month: String,
                       first_date_of_week: String,
                       last_date_of_week: String,
                       last_12_month_flag: Int,
                       last_6_month_flag: Int,
                       last_month_flag: Int
                     )

In [0]:
%scala
import java.time.LocalDate
import java.time.format.DateTimeFormatter
import scala.collection.mutable.ListBuffer
 
object date_time_utils {
  val default_format = "yyyy-MM-dd"
 
  def check(start_date: String, end_date: String): Boolean = {
    val start = LocalDate.parse(start_date, DateTimeFormatter.ofPattern(default_format))
    val end = LocalDate.parse(end_date, DateTimeFormatter.ofPattern(default_format))
    end.isAfter(start)
  }
 
 
  def convert_string_to_date(current_date: String, input_format: String, output_format: String) = {
    val input_formatter = DateTimeFormatter.ofPattern(input_format)
    val output_formatter = DateTimeFormatter.ofPattern(output_format)
    output_formatter.format(input_formatter.parse(current_date))
  }
 
  def get_day_suffix(current_date: String) = {
    val day = convert_string_to_date(current_date, default_format, "d").toInt
    day match {
      case 1 => "st"
      case 21 => "st"
      case 31 => "st"
      case 2 => "nd"
      case 22 => "nd"
      case 3 => "rd"
      case 23 => "rd"
      case _ => "th"
    }
  }
 
  def get_quater_name(current_date: String) = {
    val quater = convert_string_to_date(current_date, default_format, "Q").toInt
    quater match {
      case 1 => "Q1"
      case 2 => "Q2"
      case 3 => "Q3"
      case 4 => "Q4"
    }
  }
 
  def is_weekend(current_date: String) = {
    val week_day_name = convert_string_to_date(current_date, default_format, "EEEE")
    week_day_name match {
      case "Saturday" => 1
      case "Sunday" => 1
      case _ => 0
    }
  }
 
  def get_date_of_year(current_date: String, position: String) = {
    val year = convert_string_to_date(current_date, default_format, "u")
    position match {
      case "first" => year + "-01-01"
      case "last" => year + "-12-31"
    }
  }
 
  def get_first_date_of_quarter(current_date: String) = {
    val quater = convert_string_to_date(current_date, default_format, "QQ")
    val year = convert_string_to_date(current_date, default_format, "u")
    quater match {
      case "01" => year + "-01-01"
      case "02" => year + "-04-01"
      case "03" => year + "-07-01"
      case "04" => year + "-10-01"
    }
  }
 
  def get_last_date_of_quarter(current_date: String) = {
    val quater = convert_string_to_date(current_date, default_format, "QQ")
    val year = convert_string_to_date(current_date, default_format, "u")
    quater match {
      case "01" => year + "-03-31"
      case "02" => year + "-06-30"
      case "03" => year + "-09-30"
      case "04" => year + "-12-31"
    }
  }
 
  def get_first_date_of_month(current_date: String) = {
    val month = convert_string_to_date(current_date, default_format, "MM")
    val year = convert_string_to_date(current_date, default_format, "u")
    year + "-" + month + "-01"
  }
 
  def get_last_date_of_month(current_date: String) = {
    val converted_date = LocalDate.parse(current_date, DateTimeFormatter.ofPattern(default_format))
    val last_day_of_month = converted_date.withDayOfMonth(converted_date.getMonth.length(converted_date.isLeapYear))
    last_day_of_month.toString
  }
 
  def get_first_date_of_week(current_date: String) = {
    val converted_date = LocalDate.parse(current_date, DateTimeFormatter.ofPattern(default_format))
    val day_backward = convert_string_to_date(current_date, default_format, "e").toInt - 1
    converted_date.minusDays(day_backward).toString
  }
 
  def get_last_date_of_week(current_date: String) = {
    val converted_date = LocalDate.parse(current_date, DateTimeFormatter.ofPattern(default_format))
    val day_forward = 7 - convert_string_to_date(current_date, default_format, "e").toInt
    converted_date.plusDays(day_forward).toString
  }
  
  def get_last_12_month_list() = {
    var last_12_month_list = ListBuffer[String]()
    var i = 0
    for( i <- 1 to 12){
      last_12_month_list += DateTimeFormatter.ofPattern("yyyyMM").format(LocalDate.now.minusMonths(i))
    }
    last_12_month_list
  }
  
  
  def get_last_12_month_flag(yyyyMM: String) = {
    if (get_last_12_month_list().contains(yyyyMM)) 1 else 0
  }
  
  def get_last_6_month_flag(yyyyMM: String) = {    
    if (get_last_12_month_list().slice(0,6).contains(yyyyMM)) 1 else 0
  }
  
  def get_last_month_flag(yyyyMM: String) = {
    if (get_last_12_month_list()(0).equals(yyyyMM)) 1 else 0
  }
  
  def get_calendar_end_date(plus_month:Int) = {
    get_last_date_of_month(DateTimeFormatter.ofPattern("yyyy-MM-dd").format(LocalDate.now.plusMonths(plus_month)))
  }
}

In [0]:
%scala
def dim_date_generator(current_date: String): dim_date_schema ={
  val default_format = "yyyy-MM-dd"
  val date_key = date_time_utils.convert_string_to_date(current_date, default_format, "yyyyMMdd").toInt
  val date = current_date
  val day = date_time_utils.convert_string_to_date(current_date, default_format, "d").toInt
  val day_suffix = date_time_utils.get_day_suffix(current_date)
  val week_day = date_time_utils.convert_string_to_date(current_date, default_format, "e").toInt
  val week_day_name = date_time_utils.convert_string_to_date(current_date, default_format, "EEEE")
  val week_day_name_short = date_time_utils.convert_string_to_date(current_date, default_format, "E").toUpperCase
  val week_day_name_first_letter = date_time_utils.convert_string_to_date(current_date, default_format, "E").substring(0, 1)
  val day_of_year = date_time_utils.convert_string_to_date(current_date, default_format, "D").toInt
  val week_of_month = date_time_utils.convert_string_to_date(current_date, default_format, "W").toInt
  val week_of_year = date_time_utils.convert_string_to_date(current_date, default_format, "w").toInt
  val month = date_time_utils.convert_string_to_date(current_date, default_format, "M").toInt
  val month_name = date_time_utils.convert_string_to_date(current_date, default_format, "MMMM")
  val month_name_short = date_time_utils.convert_string_to_date(current_date, default_format, "MMM").toUpperCase
  val month_name_first_letter = date_time_utils.convert_string_to_date(current_date, default_format, "MMM").substring(0, 1)
  val quarter = date_time_utils.convert_string_to_date(current_date, default_format, "Q").toInt
  val quarter_name = date_time_utils.get_quater_name(current_date)
  val year = date_time_utils.convert_string_to_date(current_date, default_format, "u").toInt
  val yyyyMM = date_time_utils.convert_string_to_date(current_date, default_format, "yyyyMM")
  val month_year = date_time_utils.convert_string_to_date(current_date, default_format, "yyyy MMM").toUpperCase
  val is_weekend = date_time_utils.is_weekend(current_date)
  val is_holiday = 0
  val first_date_of_year = date_time_utils.get_date_of_year(current_date, "first")
  val last_date_of_year = date_time_utils.get_date_of_year(current_date, "last")
  val first_date_of_quarter = date_time_utils.get_first_date_of_quarter(current_date)
  val last_date_of_quarter = date_time_utils.get_last_date_of_quarter(current_date)
  val first_date_of_month = date_time_utils.get_first_date_of_month(current_date)
  val last_date_of_month = date_time_utils.get_last_date_of_month(current_date)
  val first_date_of_week = date_time_utils.get_first_date_of_week(current_date)
  val last_date_of_week = date_time_utils.get_last_date_of_week(current_date)
  val last_12_month_flag = date_time_utils.get_last_12_month_flag(yyyyMM)
  val last_6_month_flag = date_time_utils.get_last_6_month_flag(yyyyMM)
  val last_month_flag = date_time_utils.get_last_month_flag(yyyyMM)
  dim_date_schema(date_key, date, day, day_suffix, week_day, week_day_name, week_day_name_short, week_day_name_first_letter,
    day_of_year, week_of_month, week_of_year, month, month_name, month_name_short, month_name_first_letter, quarter,
    quarter_name, year, yyyyMM, month_year, is_weekend, is_holiday, first_date_of_year, last_date_of_year, first_date_of_quarter,
    last_date_of_quarter, first_date_of_month, last_date_of_month, first_date_of_week, last_date_of_week, last_12_month_flag, last_6_month_flag, last_month_flag)
}

In [0]:
%scala
import org.apache.spark.sql.functions._
import org.apache.spark.sql.DataFrame
import java.text.SimpleDateFormat
import java.util.Date

def create_final_df(): DataFrame = {

  val factOrders_df = spark.sql(s"SELECT * FROM factOrders")
  //val minVal = factOrders_df.agg(min("OrderDate")).collect()(0)(0)
  //val maxVal = factOrders_df.agg(max("DueDate")).collect()(0)(0)
  //print(minVal, maxVal)

  val dateFormat = new SimpleDateFormat("yyyy-MM-dd")
 
  //Start Date
  val start_date_init = factOrders_df.agg(min("OrderDate")).collect()(0)(0)
  var start_date: String = start_date_init.asInstanceOf[String]
  //val start_date: Date = dateFormat.parse(dateStringStartDate)
 
  //End Date
  val end_date_init = factOrders_df.agg(max("DueDate")).collect()(0)(0)
  var end_date: String = end_date_init.asInstanceOf[String]
  end_date = LocalDate.parse(end_date, DateTimeFormatter.ofPattern("yyyy-MM-dd")).plusDays(1).toString
  //val end_date: Date = dateFormat.parse(dateStringEndDate)
 
  //Mutable list to store dim date
  var dim_date_mutable_list = new ListBuffer[dim_date_schema]()
 
  while (date_time_utils.check(start_date, end_date)) {
    val dim_date_schema_object = dim_date_generator(start_date)
    dim_date_mutable_list += dim_date_schema_object
    start_date = LocalDate.parse(start_date, DateTimeFormatter.ofPattern("yyyy-MM-dd")).plusDays(1).toString
  }
 
  val dim_date_list = dim_date_mutable_list.toList
  val dim_date_df = spark.createDataset(dim_date_list)
  dim_date_df.select(col("date_key")
    , col("date").cast("date")
    , col("day")
    , col("day_suffix")
    , col("week_day")
    , col("week_day_name")
    , col("week_day_name_short")
    , col("week_day_name_first_letter")
    , col("day_of_year")
    , col("week_of_month")
    , col("week_of_year")
    , col("month")
    , col("month_name")
    , col("month_name_short")
    , col("month_name_first_letter")
    , col("quarter")
    , col("quarter_name")
    , col("year")
    , col("yyyymm")
    , col("month_year")
    , col("is_weekend")
    , col("is_holiday")
    , col("first_date_of_year").cast("date")
    , col("last_date_of_year").cast("date")
    , col("first_date_of_quarter").cast("date")
    , col("last_date_of_quarter").cast("date")
    , col("first_date_of_month").cast("date")
    , col("last_date_of_month").cast("date")
    , col("first_date_of_week").cast("date")
    , col("last_date_of_week").cast("date")
    , col("last_12_month_flag")
    , col("last_6_month_flag")
    , col("last_month_flag")
    , current_timestamp().as("load_date"))
}

### Saving tables into storage

In [0]:
%scala
import org.apache.spark.sql.SaveMode
//df.write.format('delta').mode("overwrite").save(output_path)
val output_path = "/mnt/gold/SalesLT/dimDate/"
val df_date = create_final_df()
//Save the DataFrame as a Delta table
df_date.write.format("delta").mode(SaveMode.Overwrite).save(output_path)


In [0]:
dimCustomer_df.write.format('delta').mode("overwrite").save("/mnt/gold/SalesLT/dimCustomer/")
dimProduct_df.write.format('delta').mode("overwrite").save("/mnt/gold/SalesLT/dimProduct/")
factOrders_df.write.format('delta').mode("overwrite").save("/mnt/gold/SalesLT/factOrders/")