In [0]:
storage_account = "databricks180820251"


In [0]:
files = dbutils.fs.ls(f"abfss://bronze@{storage_account}.dfs.core.windows.net/")
files

In [0]:
spark.sql("USE CATALOG hive_metastore")

In [0]:
type(files)

In [0]:
display(files[0].path)

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType, FloatType


In [0]:
orders_schema = StructType([
    StructField("order_id", IntegerType(), True),
    StructField("order_date", TimestampType(), True),
    StructField("customer_id", IntegerType(), True),
    StructField("order_status", StringType(), True)])

In [0]:
department_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True)])

In [0]:
category_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("department_d", IntegerType(), True),
    StructField("name", StringType(), True)])

In [0]:
customers_schema = StructType([
    StructField("id", IntegerType(), True),    
    StructField("Fname", StringType(), True),
    StructField("Lname", StringType(), True),
    StructField("Email", StringType(), True),
    StructField("Mobile", StringType(), True),
    StructField("Street", StringType(), True),
    StructField("City", StringType(), True),
    StructField("State", StringType(), True),
    StructField("ZipCode", StringType(), True)])
products_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("Category_id", IntegerType(), True),
    StructField("Name", StringType(), True),    
    StructField("Description", IntegerType(), True),             
    StructField("Price", FloatType(), True),
    StructField("Image", IntegerType(), True)])

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType
oi_schema = StructType([

    StructField("id", IntegerType(), True),
    StructField("order_id", IntegerType(), True),
    StructField("product_id", IntegerType(), True),
    StructField("quantity", IntegerType(), True),
    StructField("total", DoubleType(), True),
    StructField("price", DoubleType(), True)])

In [0]:
schema_mapping = { "orders": orders_schema, "orderItems": oi_schema, "departments": department_schema, "categories": category_schema, "customers": customers_schema, "products": products_schema }

In [0]:

databasename = spark.sql("CREATE DATABASE IF NOT EXISTS retail_test")
database = "retail_test"
for table in files:
    print(table.name)
    print(table.path)
    tableName = table.name.replace(".csv","")
    if(tableName == "products"):
        df = spark.read.csv(table.path, header=True, schema=schema_mapping[tableName])
    else:
     df = spark.read.csv(table.path, header=False, schema=schema_mapping[tableName])     
     
    display(tableName)
    spark.sql(f"DROP TABLE IF EXISTS {database}.{tableName}") 
    df.write.format.mode("overwrite").option("path",f"abfss://silver@{storage_account}.dfs.core.windows.net/retail_test/{tableName}").option("mergeSchema","true").saveAsTable(f"{database}.{tableName}")



In [0]:
%sql
use  retail_test;
show tables;

In [0]:
orders = spark.sql("select * from retail_test.orders")
orders.display()

In [0]:
sel = orders.select("order_id","order_status")
display(sel)

In [0]:
sle_col = orders.select(orders.order_id,orders.order_date,orders.order_status)
display(sle_col)

In [0]:
products = spark.sql("select * from products")

In [0]:
from pyspark.sql.functions import col, round
discount = products.withColumn("DiscountedPrice", round(col("Price") - col("Price") * .1,2))
discount.display()

In [0]:
from pyspark.sql.functions import lit
custoemrs = spark.sql("select * from customers")
withCountry = custoemrs.withColumn("country", lit("USA"))
withCountry.display()


In [0]:
cust = custoemrs.withColumnRenamed("Fname","FirstName").withColumnRenamed("Lname","LastName")
cust.display()



In [0]:
orders = spark.sql("select * from orders")

In [0]:
distictorders = orders.select("order_status").distinct().display()


In [0]:
canceldorders = orders.filter("order_status = 'CANCELED'").display()

In [0]:
completedorders = orders.filter((orders.order_status == 'COMPLETE') | (orders.order_status == 'CANCELED') | (orders.order_status == 'CLOSED'))
completedorders.display()

In [0]:
completedorders.select("order_status").distinct().display()

In [0]:
L = ["COMPLETE","CLOSED","CANCELED"]
orders.filter(orders.order_status.isin(L)).display()

In [0]:
%sql
SELECT * FROM retail_test.products

In [0]:
productsdf = spark.sql("select * from products")
productsdf.display()


In [0]:
nike_products = productsdf.filter(productsdf.Name.like('%Nike%'))
nike_products.display()


In [0]:
nike_products1 = productsdf.filter(productsdf.Name.like('Nike%'))
nike_products1.display()


In [0]:
nike_productsends = productsdf.filter(productsdf.Name.like('%nike'))
nike_productsends.display()


In [0]:
from pyspark.sql.functions import when

l = ['CLOSED','CANCELED','COMPLETE']
ordrswith_data = orders.withColumn("Status", when(orders.order_status.isin(l), "Completed").otherwise("Pending"))

ordrswith_data.display()

In [0]:
distinct = ordrswith_data.select(ordrswith_data.order_status, ordrswith_data.Status).distinct()
distinct.display()

# CASE.. WHEN..

In [0]:
canceledList = ["CLOSED","COMPLETE","CANCELED"]
pendingList = ["PENDING_PAYMENT","PAYMENT_REVIEW"]
ordercasewhendf = orders.withColumn("Status", 
                                    when(orders.order_status.isin(canceledList), "Completed")
                                    .when(orders.order_status.isin(pendingList), "inprogress").
                                    when(orders.order_status=="SUSPECTED_FRAUD", "FRAUD").
                                    otherwise("Pending"))
ordercasewhendf.display()



In [0]:
ordercasewhendf.select("order_status","Status").distinct().display()

In [0]:
count = orders.count()
count.display()

In [0]:
gorupbydf= orders.select("order_status").withColumn("CountBy",orders.order_status).groupBy("order_status").count()
gorupbydf.display()

In [0]:
orderitemsdf = spark.sql("select * from orders")
orderitemsdf.display()

In [0]:
%sql
USE DATABASE retail_test;
SHOW TABLES;

In [0]:
orderitemsdf = spark.sql("select * from orderItems")
orderitemsdf.display()


In [0]:
from pyspark.sql.functions import max,avg,count,sum,min


In [0]:
%sql
use database retail_test;
show tables;

In [0]:
ordersdf = spark.sql("select * from retail_test.orders")
ordersdf.count()

In [0]:
orderitemsdf = spark.sql("select total from retail_test.orderItems")
revenue = orderitemsdf.agg(sum("total")).alias("Revenue")
revenue.display()


# AGGRIGATE FUNCTIONS

In [0]:
ordersdatedf = spark.sql("select * from retail_test.orders")
from pyspark.sql.functions import min, max

ordersdatemidate = ordersdatedf.agg(min("order_date").alias("minDate"), max("order_date").alias("maxDate"))
ordersdatemidate.display()

# Group By

In [0]:
Group By

In [0]:
ordersdf = spark.sql("select * from retail_test.orders")
statusCount = ordersdf.groupBy("order_status").count()
statusCount.display()


# Group by having

In [0]:
ordersdf = spark.sql("select * from retail_test.orders")
statusCount = ordersdf.groupBy("order_status").count().alias("Count")
from pyspark.sql.functions import col
filterdata = statusCount.where(col("Count") >= 5000)
filterdata.display()

In [0]:
orderdatadf = filterdata.orderBy(col("order_status").desc())
orderdatadf.display()

In [0]:
orderscount = spark.sql("select * from retail_test.orders")
from pyspark.sql.functions import count,avg

orderdatadf = orderscount.groupBy("order_date").count().alias("Count")
avaragedf = orderdatadf.agg(avg("Count")).alias("Average")
avaragedf.display()


In [0]:
%sql
describe retail_test.orderItems

In [0]:
orderitemsdf = spark.sql("select * from retail_test.orderItems")
orderitemsdf.display()

In [0]:
orderitemsdf = spark.sql("select * from retail_test.orderItems")
from pyspark.sql.functions import count,avg,sum,round
orderdatadf = orderitemsdf.groupBy("order_id").agg(round(sum("total"), 2).alias("totalValue"))

orderdatadf.display()