In [0]:
%sh
ls /tmp

In [0]:
%sh
wget -P /tmp "https://raw.githubusercontent.com/msellamiTN/Apache-Spark-Training/master/retail_db/orders.csv"
wget -P /tmp "https://raw.githubusercontent.com/msellamiTN/Apache-Spark-Training/master/retail_db/order_items.csv"
wget -P /tmp "https://raw.githubusercontent.com/msellamiTN/Apache-Spark-Training/master/retail_db/products.csv"
wget -P /tmp "https://raw.githubusercontent.com/msellamiTN/Apache-Spark-Training/master/retail_db/categories.csv"
wget -P /tmp "https://raw.githubusercontent.com/msellamiTN/Apache-Spark-Training/master/Adventure%20Works/AdventureWorks_Products.csv"

In [0]:
%python
localOrderFilePath = "file:/tmp/orders.csv"
localOrderItemFilePath = "file:/tmp/order_items.csv"
localProductFilePath = "file:/tmp/products.csv"
localCategoriesFilePath = "file:/tmp/categories.csv"
localAdventureWorks_ProductsPath = "file:/tmp/AdventureWorks_Products.csv"

dbutils.fs.mkdirs("dbfs:/datasets")
dbutils.fs.cp(localOrderFilePath, "dbfs:/datasets/")#hadoop fs -put * /hdfs
dbutils.fs.cp(localOrderItemFilePath, "dbfs:/datasets")
dbutils.fs.cp(localProductFilePath, "dbfs:/datasets/")
dbutils.fs.cp(localCategoriesFilePath, "dbfs:/datasets")
dbutils.fs.cp(localAdventureWorks_ProductsPath, "dbfs:/datasets")

display(dbutils.fs.ls("dbfs:/datasets"))

path,name,size
dbfs:/datasets/AdventureWorks_Products.csv,AdventureWorks_Products.csv,58122
dbfs:/datasets/categories.csv,categories.csv,1029
dbfs:/datasets/order_items.csv,order_items.csv,5408880
dbfs:/datasets/orders.csv,orders.csv,2999944
dbfs:/datasets/products.csv,products.csv,171902


In [0]:
%python
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
 
spark = (SparkSession
             .builder
             .appName("Retail Data Analysis With SPARK DataFrame")
             .enableHiveSupport()
             .getOrCreate()
            )
spark

In [0]:
ordersDF = spark.read.csv("dbfs:/datasets/orders.csv")
productsDF = spark.read.csv("dbfs:/datasets/products.csv")
orderItemsDF = spark.read.csv("dbfs:/datasets/order_items.csv") 
categoriesDF = spark.read.csv("dbfs:/datasets/categories.csv") 
AdventureWorksProductsDF = spark.read.csv("dbfs:/datasets/AdventureWorks_Products.csv")

In [0]:
ordersSchema  = StructType([
    StructField("OrderID",IntegerType(),True),  
    StructField("OrderDate",DateType(),True),  
    StructField("CustomerID", IntegerType(), True), 
    StructField("OrderStatus", StringType(), True)   
  ])
 
ordersDFWithSchema = (sqlContext.read.format("csv")
   .option("delimiter",",").option("quote","")
  .option("header", "false")
  .schema(ordersSchema)
  .load("dbfs:/datasets/orders.csv"))
 
#ordersDFWithSchema.show()

productsSchema  = StructType([
    StructField("ProductID",IntegerType(),True), 
    StructField("CategoryID",IntegerType(),True), 
    StructField("Descriptione",StringType(),True),  
    StructField("Price", DoubleType(), True), 
    StructField("Image", StringType(), True)   
  ])
 
productsDFWithSchema = (sqlContext.read.format("csv")
   .option("delimiter",",").option("quote","")
  .option("header", "false")
  .schema(productsSchema)
  .load("dbfs:/datasets/products.csv"))
 
#productsDFWithSchema.show()

ordersItemSchema =( StructType([
     StructField("OrderItemID",IntegerType()),
     StructField("OrderID",IntegerType()),
     StructField("ProductID",IntegerType()),
     StructField("Quantity",DoubleType()),
     StructField("Subtotal",DoubleType()),
     StructField("ProductPrice",DoubleType())]
     )
                  )
 
ordersItemDFWithSchema =(sqlContext
                          .read
                          .format("csv")
                          .option("header", "false")
                          .schema(ordersItemSchema)
                          .csv("dbfs:/datasets/order_items.csv")
                         )

categorySchema =( StructType([
     StructField("CategoryID",IntegerType()),
     StructField("CategoryDepartmentID",IntegerType()),
     StructField("CategoryName",StringType())]
     )
                  )
 
categoryDFWithSchema =(sqlContext
                          .read
                          .format("csv")
                          .option("header", "false")
                          .schema(categorySchema)
                          .csv("dbfs:/datasets/categories.csv")
                         )

In [0]:
ordersDFWithSchema.show(5)

In [0]:
productsDFWithSchema.show(5)

In [0]:
ordersItemDFWithSchema.show(5)

In [0]:
# faire la jointure des tables

OrderjoinedDf = ordersDFWithSchema.join(ordersItemDFWithSchema, ["OrderID"], "inner")

ProductjoinedDf = OrderjoinedDf.join(productsDFWithSchema, ["ProductID"], "inner")

fullDF = ProductjoinedDf.join(categoryDFWithSchema, ["CategoryID"], "inner")

In [0]:
fullDF.show()

In [0]:
# nb total de commande , revenue, ... groupby customerID

CustomerSummary =(ProductjoinedDf.select(
  col("CustomerID")
, col("Quantity")
, col("ProductPrice")
, col("OrderDate")
).groupBy("CustomerID")
  .agg(
      sum("Quantity").alias("Total Orders"),
      sum("ProductPrice").alias("Total Revenue"),
      avg("ProductPrice").alias("Avg Order Revenue"),
      max("OrderDate").alias("Last Order Date")
  )     
)

   
CustomerSummary.show() 

In [0]:
# nb total de commande , revenue, ... groupby customerID

ProductSummary =(fullDF.select(
  col("ProductID")
#, col("CategoryID")
#, col("CategoryName")
, col("Quantity")
, col("ProductPrice")
, col("Price")
 #.groupBy("ProductID", "CategoryID", "CategoryName")
 ).groupBy("ProductID")
  .agg(
      sum("Quantity").alias("Total Quantity"),
      sum("ProductPrice").alias("Total Revenue"),
      avg("ProductPrice").alias("Avg Order Revenue"),
      avg("Quantity").alias("Avg Quantity"),
      avg(col("ProductPrice")-col("ProductPrice")*0.30).alias("Avg Discount"),
      avg("Price").alias("Avg Price")
  )     
)

In [0]:
%python

CustomerSummary.createGlobalTempView("CustomerSummary") 
sqlCustomerSummary = spark.sql("SELECT * FROM global_temp.CustomerSummary")

ProductSummary.createGlobalTempView("ProductSummary") 
sqlProductSummary = spark.sql("SELECT * FROM global_temp.ProductSummary")

In [0]:
fullDF.createGlobalTempView("fullDF") 
sqlfullDF = spark.sql("SELECT * FROM global_temp.fullDF")