In [0]:
%python
#Define the volume paths for each of the input data sets 
ORDERS_PATH = "/Volumes/workspace/sales_channel/input_data/Orders.json"
CUSTOMERS_PATH = "/Volumes/workspace/sales_channel/input_data/Customer.xlsx"
PRODUCTS_PATH = "/Volumes/workspace/sales_channel/input_data/Products.csv"

In [0]:
%python
#Use sales_channel schema to create the tables
spark.sql("USE sales_channel")

DataFrame[]

In [0]:
%python
from pyspark.sql.functions import col, trim
from pyspark.sql.utils import AnalysisException
##Read Orders json from above defined path
df_json = (spark.read
.format("json")
.option("multiline", "true")
.load(ORDERS_PATH))

try: ## implement try catch block to handle file not found exception
    #Rename the Columns to lower case and remove the space/s in the column names
    orders_bronze = (df_json 
        .withColumnRenamed("Row ID", "row_id")
        .withColumnRenamed("Order ID", "order_id")
        .withColumnRenamed("Order Date", "order_date")
        .withColumnRenamed("Ship Date", "ship_date")
        .withColumnRenamed("Ship Mode", "ship_mode")
        .withColumnRenamed("Customer ID", "customer_id")
        .withColumnRenamed("Product ID", "product_id")
        .withColumnRenamed("Quantity", "quantity")
        .withColumnRenamed("Price", "price")
        .withColumnRenamed("Discount", "discount")
        .withColumnRenamed("Profit", "profit"))
#Trim Strings
    for c in ["order_id","order_date","ship_date","ship_mode","customer_id","product_id"]:
        if c in orders_bronze.columns:
            orders_bronze = orders_bronze.withColumn(c, trim(col(c)))

## Load the delta bronze_orders_landing table with proper column names from raw data

    orders_bronze.write.mode("overwrite").format("delta").saveAsTable("sales_channel.bronze_orders_landing")
    print(f"bronze_orders_landing table is loaded successfully")

except FileNotFoundError as e:
    # Raise Error
    print(f"Error: Products file missing at {ORDERS_PATH}")

bronze_orders_landing table is loaded successfully


In [0]:
%sql
select count(*) from sales_channel.bronze_orders_landing

count(*)
9994


In [0]:
%python
## Read Customer XLSX file and load it into bronze_customers_landing table

%pip install openpyxl
######## %pip install openpyxl is already installed from Test_Create_DataBase_Test_Code NoteBook
######## To read/load the excel file from pandas we have to convert Pandas DataFrame to Spark DataFrame
from pyspark.sql.functions import col, trim
from pyspark.sql.utils import AnalysisException
import pyspark.pandas as ps
ps_df = ps.read_excel(
    CUSTOMERS_PATH,
    sheet_name="Worksheet", ##Worksheet name is Worksheet in input Customers XLSX file
    header=0,
    dtype=str
)
spark_df_customers = ps_df.to_spark()

try: ## implement try catch block to handle file not found exception
#Rename the columns to lower case and remove the space/s in the column names
    customers_bronze = (spark_df_customers
        .withColumnRenamed("Customer ID", "customer_id")
        .withColumnRenamed("Customer Name", "customer_name")
        .withColumnRenamed("Country", "country")
        .withColumnRenamed("City", "city")
        .withColumnRenamed("State", "state")
        .withColumnRenamed("Segment", "segment")
        .withColumnRenamed("Region", "region")
        .withColumnRenamed("Postal Code", "postal_code"))

    ## Trim Strings for below columns
    for c in ["customer_id","customer_name","country","segment","region","postal_code"]:
        if c in customers_bronze.columns:
            customers_bronze = customers_bronze.withColumn(c, trim(col(c)))
    ### Load and overwrite the delta bronze_customers_landing table with proper column names from raw data

    customers_bronze.write.mode("overwrite").format("delta").saveAsTable("sales_channel.bronze_customers_landing")
    print(f"bronze_customers_landing table is loaded successfully")
except FileNotFoundError as e:
    # Raise Error
    print(f"Error: Products file missing at {CUSTOMERS_PATH}")

[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


Tried to attach usage logger `pyspark.databricks.pandas.usage_logger`, but an exception was raised: JVM wasn't initialised. Did you call it on executor side?


bronze_customers_landing table is loaded successfully


In [0]:
%sql
select * from sales_channel.bronze_customers_landing

customer_id,customer_name,email,phone,address,segment,country,city,state,postal_code,region
PW-19240,Pierre Wener,bettysullivan808@gmail.com,421.580.0902x9815,"001 Jones Ridges Suite 338 Johnsonfort, FL 95462",Consumer,United States,Louisville,Colorado,80027,West
GH-14410,Gary567 Hansen,austindyer948@gmail.com,001-542-415-0246x314,"00347 Murphy Unions Ashleyton, IA 29814",Home Office,United States,Chicago,Illinois,60653,Central
KL-16555,Kelly Lampkin,clarencehughes280@gmail.com,7185624866,"007 Adams Lane Suite 176 East Amyberg, IN 34581",Corporate,United States,Colorado Springs,Colorado,80906,West
AH-10075,Ad. ..am Hart,angelabryant256@gmail.com,265.101.5569x1098,"01454 Christopher Turnpike North Ryanstad, MI 36226",Corporate,United States,Columbus,Ohio,43229,East
PF-19165,Philip Fox,kristinereynolds576@gmail.com,001-473-645-2141x9154,"0158 Harris Ways Suite 085 East Laceyside, SD 35649",Consumer,United States,San Diego,California,92105,West
SC-20680,Steve Carroll,jasoncontreras178@gmail.com,(563)647-4830x5318,"01630 Tammy Prairie North Daniel, KS 26404",Home Office,United States,Seattle,Washington,98105,West
JR-15700,Jocasta Rupert,johncombs689@gmail.com,-6181,"019 Emily Corner Apt. 810 Ryantown, SC 37010",Consumer,United States,Jacksonville,Florida,32216,South
AB-10105,Adrian Barton,daviddavis980@gmail.com,067.435.8553x692,"021 Katherine Mall Jameston, DC 24685",Consumer,United States,Phoenix,Arizona,85023,West
PT-19090,Pete@#$ Takahito,mikaylaarnold666@gmail.com,786.638.6820,"0236 Lane Squares Port Samantha, ME 15670",Consumer,United States,San Antonio,Texas,78207,Central
SG-20605,Speros Goranitis,brianjoyce110@gmail.com,3528465094,"02401 Angela Loop Apt. 678 Port John, ME 43448",Consumer,United States,Lafayette,Indiana,47905,Central


In [0]:
%python
from pyspark.sql.functions import col, trim
from pyspark.sql.utils import AnalysisException
PRODUCTS_PATH = "/Volumes/workspace/sales_channel/input_data/Products.csv"
###### Read Products CSV file. Because some products in the input data have commas in the name, we need to use the escape and quote options.
df_products = (spark.read
    .format("csv")
    .option("header", "true")     
    .option("inferSchema", "true") 
    .option("quote", "\"")             # treat double quotes as text in text
    .option("escape", "\"")            # escape quotes inside quoted strings
    .option("multiLine", "true") 
.load(PRODUCTS_PATH))
try: ## implement try catch block to handle file not found exception

##Rename the columns to lower case and remove the space/s in the column names
    products_bronze = (df_products
        .withColumnRenamed("Product ID", "product_id")
        .withColumnRenamed("Category", "category")
        .withColumnRenamed("Sub-Category", "sub_category")
        .withColumnRenamed("Product Name", "product_name")
        .withColumnRenamed("State", "state")
        .withColumnRenamed("Price per product", "unit_price"))

    # Trim strings for below column list
    for c in ["product_id","product_name","category","sub_category"]:
        if c in products_bronze.columns:
            products_bronze = products_bronze.withColumn(c, trim(col(c)))
    # Cast unit_price to double
    products_bronze = products_bronze.withColumn("unit_price", col("unit_price").cast("double"))

## Overwrite the delta bronze_products_landing table with proper column names from raw data
    products_bronze.write.mode("overwrite").format("delta").saveAsTable("sales_channel.bronze_products_landing")
    print(f"bronze_products_landing table is loaded successfully")
except FileNotFoundError as e: ## Raise filenotfound error
    # Raise Error
    print(f"Error: Products file missing at {PRODUCTS_PATH}")

bronze_products_landing table is loaded successfully


In [0]:
%sql
select * from sales_channel.bronze_products_landing

product_id,category,sub_category,product_name,state,unit_price
FUR-CH-10002961,Furniture,Chairs,"Leather Task Chair, Black",New York,81.882
TEC-AC-10004659,Technology,Accessories,Imation Secure+ Hardware Encrypted USB 2.0 Flash Drive; 16GB,Oklahoma,72.99
OFF-BI-10002824,Office Supplies,Binders,Recycled Easel Ring Binders,Colorado,4.25
OFF-PA-10003349,Office Supplies,Paper,Xerox 1957,Florida,5.184
TEC-AC-10003023,Technology,Accessories,Logitech G105 Gaming Keyboard,Ohio,47.496
OFF-BI-10004233,Office Supplies,Binders,"GBC Pre-Punched Binding Paper, Plastic, White, 8-1/2"" x 11""",New Jersey,15.99
OFF-PA-10004470,Office Supplies,Paper,"Adams Write n' Stick Phone Message Book, 11"" X 5 1/4"", 200 Messages",New York,5.5
FUR-FU-10001196,Furniture,Furnishings,DAX Cubicle Frames - 8x10,Indiana,5.77
OFF-ST-10000585,Office Supplies,Storage,Economy Rollaway Files,Kentucky,165.2
OFF-ST-10003996,Office Supplies,Storage,"Letter/Legal File Tote with Clear Snap-On Lid, Black Granite",Washington,16.06
