In [1]:
%sh 
curl -O https://raw.githubusercontent.com/bsullins/bensullins.com-freebies/master/sales_log.zip
file sales_log.zip

In [2]:
%sh unzip sales_log.zip

In [3]:
%fs ls 'file:/databricks/driver/sales_log/'

path,name,size
file:/databricks/driver/sales_log/sales-62.csv,sales-62.csv,9235.0
file:/databricks/driver/sales_log/sales-24.csv,sales-24.csv,9208.0
file:/databricks/driver/sales_log/sales-46.csv,sales-46.csv,9200.0
file:/databricks/driver/sales_log/sales-67.csv,sales-67.csv,9155.0
file:/databricks/driver/sales_log/sales-48.csv,sales-48.csv,9177.0
file:/databricks/driver/sales_log/sales-79.csv,sales-79.csv,9276.0
file:/databricks/driver/sales_log/sales-64.csv,sales-64.csv,9189.0
file:/databricks/driver/sales_log/sales-20.csv,sales-20.csv,9350.0
file:/databricks/driver/sales_log/sales-21.csv,sales-21.csv,9242.0
file:/databricks/driver/sales_log/.DS_Store,.DS_Store,6148.0


In [4]:
from pyspark.sql.types import *

path = "file:/databricks/driver/sales_log/"

# create schema for data so stream processing is faster
salesSchema = StructType([
  StructField("OrderID", DoubleType(), True),
  StructField("OrderDate", StringType(), True),
  StructField("Quantity", DoubleType(), True),
  StructField("DiscountPct", DoubleType(), True),
  StructField("Rate", DoubleType(), True),
  StructField("SaleAmount", DoubleType(), True),
  StructField("CustomerName", StringType(), True),
  StructField("State", StringType(), True),
  StructField("Region", StringType(), True),
  StructField("ProductKey", StringType(), True),
  StructField("RowCount", DoubleType(), True),
  StructField("ProfitMargin", DoubleType(), True)])

# Static DataFrame containing all the files in sales_log
data = (
  spark
    .read
    .schema(salesSchema)
    .csv(path)
)


# create table so we can use SQL
data.createOrReplaceTempView("sales")

display(data)

OrderID,OrderDate,Quantity,DiscountPct,Rate,SaleAmount,CustomerName,State,Region,ProductKey,RowCount,ProfitMargin
44935.0,11/17/10,37.0,0.06,200.0,7011.4,Arthur Nelson,New Mexico,West,Development - Scala,1.0,0.65
44935.0,11/17/10,37.0,0.01,140.0,5169.04,Arthur Nelson,Florida,South,Development - Database,1.0,0.64
2563.0,11/18/10,12.0,0.04,200.0,2322.41,Brenda Hildebrand,Mississippi,South,Development - Scala,1.0,0.74
2563.0,11/18/10,33.0,0.01,120.0,3951.72,Brenda Hildebrand,Mississippi,South,Development - Business Logic,1.0,0.44
2752.0,11/18/10,30.0,0.03,150.0,4399.87,Todd Cacioppo,Wisconsin,Central,Consulting - Business Model,1.0,0.53
2752.0,11/18/10,41.0,0.02,140.0,5670.14,Todd Cacioppo,Wisconsin,Central,Development - Database,1.0,0.56
2752.0,11/18/10,10.0,0.03,200.0,1955.5,Todd Cacioppo,Wisconsin,Central,Development - Big Data,1.0,0.69
2752.0,11/18/10,10.0,0.04,150.0,1451.5,Todd Cacioppo,Wisconsin,Central,Consulting - Market Research,1.0,0.7
13607.0,11/18/10,12.0,0.07,150.0,1687.37,Matthew Lucas,New Mexico,West,Consulting - Business Model,1.0,0.59
13607.0,11/18/10,37.0,0.09,200.0,6787.8,Matthew Lucas,New Mexico,West,Development - Scala,1.0,0.74


In [5]:
%sql select * from sales

OrderID,OrderDate,Quantity,DiscountPct,Rate,SaleAmount,CustomerName,State,Region,ProductKey,RowCount,ProfitMargin
44935.0,11/17/10,37.0,0.06,200.0,7011.4,Arthur Nelson,New Mexico,West,Development - Scala,1.0,0.65
44935.0,11/17/10,37.0,0.01,140.0,5169.04,Arthur Nelson,Florida,South,Development - Database,1.0,0.64
2563.0,11/18/10,12.0,0.04,200.0,2322.41,Brenda Hildebrand,Mississippi,South,Development - Scala,1.0,0.74
2563.0,11/18/10,33.0,0.01,120.0,3951.72,Brenda Hildebrand,Mississippi,South,Development - Business Logic,1.0,0.44
2752.0,11/18/10,30.0,0.03,150.0,4399.87,Todd Cacioppo,Wisconsin,Central,Consulting - Business Model,1.0,0.53
2752.0,11/18/10,41.0,0.02,140.0,5670.14,Todd Cacioppo,Wisconsin,Central,Development - Database,1.0,0.56
2752.0,11/18/10,10.0,0.03,200.0,1955.5,Todd Cacioppo,Wisconsin,Central,Development - Big Data,1.0,0.69
2752.0,11/18/10,10.0,0.04,150.0,1451.5,Todd Cacioppo,Wisconsin,Central,Consulting - Market Research,1.0,0.7
13607.0,11/18/10,12.0,0.07,150.0,1687.37,Matthew Lucas,New Mexico,West,Consulting - Business Model,1.0,0.59
13607.0,11/18/10,37.0,0.09,200.0,6787.8,Matthew Lucas,New Mexico,West,Development - Scala,1.0,0.74


In [6]:
%sql 
select 
  ProductKey as Products,
  round(sum(SaleAmount)) as TotalSales
from sales
group by ProductKey
order by 2 desc
limit 100

Products,TotalSales
Development - Java,3524581.0
Consulting - Market Research,3311013.0
Development - Python,3261009.0
Consulting - Business Model,2769287.0
Development - Big Data,2644929.0
Training - SQL,2475627.0
Development - Business Logic,1948799.0
Development - .Net,1775945.0
Development - Scala,1400128.0
Training - Development,1149507.0


# Streaming Setup
Now we'll try to convert the above analysis we completed to a streaming solution, by reading in each file one by one.

In [8]:
from pyspark.sql.functions import *

# Similar to definition of staticInputDF above, just using `readStream` instead of `read`
streamingInputDF = (
  spark
    .readStream                       
    .schema(salesSchema)              # Set the schema for our feed
    .option("maxFilesPerTrigger", 1)  # Treat a sequence of files as a stream by picking one file at a time
    .csv(path)
)

# Same query as staticInputDF
streamingCountsDF = (                 
  streamingInputDF
    .select("ProductKey", "SaleAmount")
    .groupBy("ProductKey")
    .sum()
)

# Is this DF actually a streaming DF?
streamingCountsDF.isStreaming

In [9]:
query = (
  streamingCountsDF
    .writeStream
    .format("memory")        # memory = store in-memory table (for testing only in Spark 2.0)
    .queryName("sales_stream")     # counts = name of the in-memory table
    .outputMode("complete")  # complete = all the counts should be in the table
    .start()
)

In [10]:
%sql 
select *  
from sales_stream
order by 2 desc
limit 100

ProductKey,sum(SaleAmount)
Consulting - Market Research,135985.28999999998
Development - Java,135291.44
Development - Python,116044.06
Development - Big Data,100828.91
Consulting - Business Model,96701.11999999998
Training - SQL,76646.94
Development - .Net,69632.86000000002
Development - Business Logic,63567.36
Consulting - Strategy,49782.78
Training - Development,43184.85
