In [1]:
%sh
curl -O 'https://raw.githubusercontent.com/bsullins/bensullins.com-freebies/master/sales_log.zip'
files sales_log.zip

In [2]:
%sh unzip sales_log.zip

In [3]:
%fs ls 'file:/databricks/driver/sales_log/'

path,name,size
file:/databricks/driver/sales_log/sales-81.csv,sales-81.csv,9297
file:/databricks/driver/sales_log/sales-72.csv,sales-72.csv,9271
file:/databricks/driver/sales_log/sales-46.csv,sales-46.csv,9200
file:/databricks/driver/sales_log/sales-77.csv,sales-77.csv,9230
file:/databricks/driver/sales_log/sales-2.csv,sales-2.csv,9167
file:/databricks/driver/sales_log/sales-18.csv,sales-18.csv,9287
file:/databricks/driver/sales_log/sales-29.csv,sales-29.csv,9184
file:/databricks/driver/sales_log/sales-80.csv,sales-80.csv,9250
file:/databricks/driver/sales_log/sales-34.csv,sales-34.csv,9189
file:/databricks/driver/sales_log/sales-51.csv,sales-51.csv,9206


In [4]:
# Read the data

from pyspark.sql.types import *
path = "file:/databricks/driver/sales_log/"
# creating a schema for data for streaming to be faster
# true here is whether the column can contain null or not
salesSchema = StructType([
  StructField("OrderID", DoubleType(), True),
  StructField("OrderDate", StringType(), True),
  StructField("Quantity", DoubleType(), True),
  StructField("DiscountPct", DoubleType(), True),
  StructField("Rate", DoubleType(), True),
  StructField("SaleAmount", DoubleType(), True),
  StructField("CustomerName", StringType(), True),
  StructField("State", StringType(), True),
  StructField("Region", StringType(), True),
  StructField("ProductKey", StringType(), True),
  StructField("RowCount", DoubleType(), True),
  StructField("ProfitMargin", DoubleType(), True)
]
)

# This is a dataframe containing all the files in sales_log
data = (
spark.read.schema(salesSchema).csv(path)
)

# creating table for us to use SQL
# This is a temporary table that wont be there in data but once we close this session it will be gone
data.createOrReplaceTempView('sales')
display(data)

 # below cell is to check if the table sales exists and once it exists we can create visualizations

OrderID,OrderDate,Quantity,DiscountPct,Rate,SaleAmount,CustomerName,State,Region,ProductKey,RowCount,ProfitMargin
44935.0,11/17/10,37.0,0.06,200.0,7011.4,Arthur Nelson,New Mexico,West,Development - Scala,1.0,0.65
44935.0,11/17/10,37.0,0.01,140.0,5169.04,Arthur Nelson,Florida,South,Development - Database,1.0,0.64
2563.0,11/18/10,12.0,0.04,200.0,2322.41,Brenda Hildebrand,Mississippi,South,Development - Scala,1.0,0.74
2563.0,11/18/10,33.0,0.01,120.0,3951.72,Brenda Hildebrand,Mississippi,South,Development - Business Logic,1.0,0.44
2752.0,11/18/10,30.0,0.03,150.0,4399.87,Todd Cacioppo,Wisconsin,Central,Consulting - Business Model,1.0,0.53
2752.0,11/18/10,41.0,0.02,140.0,5670.14,Todd Cacioppo,Wisconsin,Central,Development - Database,1.0,0.56
2752.0,11/18/10,10.0,0.03,200.0,1955.5,Todd Cacioppo,Wisconsin,Central,Development - Big Data,1.0,0.69
2752.0,11/18/10,10.0,0.04,150.0,1451.5,Todd Cacioppo,Wisconsin,Central,Consulting - Market Research,1.0,0.7
13607.0,11/18/10,12.0,0.07,150.0,1687.37,Matthew Lucas,New Mexico,West,Consulting - Business Model,1.0,0.59
13607.0,11/18/10,37.0,0.09,200.0,6787.8,Matthew Lucas,New Mexico,West,Development - Scala,1.0,0.74


In [5]:

%sql select * from sales

OrderID,OrderDate,Quantity,DiscountPct,Rate,SaleAmount,CustomerName,State,Region,ProductKey,RowCount,ProfitMargin
44935.0,11/17/10,37.0,0.06,200.0,7011.4,Arthur Nelson,New Mexico,West,Development - Scala,1.0,0.65
44935.0,11/17/10,37.0,0.01,140.0,5169.04,Arthur Nelson,Florida,South,Development - Database,1.0,0.64
2563.0,11/18/10,12.0,0.04,200.0,2322.41,Brenda Hildebrand,Mississippi,South,Development - Scala,1.0,0.74
2563.0,11/18/10,33.0,0.01,120.0,3951.72,Brenda Hildebrand,Mississippi,South,Development - Business Logic,1.0,0.44
2752.0,11/18/10,30.0,0.03,150.0,4399.87,Todd Cacioppo,Wisconsin,Central,Consulting - Business Model,1.0,0.53
2752.0,11/18/10,41.0,0.02,140.0,5670.14,Todd Cacioppo,Wisconsin,Central,Development - Database,1.0,0.56
2752.0,11/18/10,10.0,0.03,200.0,1955.5,Todd Cacioppo,Wisconsin,Central,Development - Big Data,1.0,0.69
2752.0,11/18/10,10.0,0.04,150.0,1451.5,Todd Cacioppo,Wisconsin,Central,Consulting - Market Research,1.0,0.7
13607.0,11/18/10,12.0,0.07,150.0,1687.37,Matthew Lucas,New Mexico,West,Consulting - Business Model,1.0,0.59
13607.0,11/18/10,37.0,0.09,200.0,6787.8,Matthew Lucas,New Mexico,West,Development - Scala,1.0,0.74


In [6]:
%sql
select productkey as products,
round(sum(saleamount)) as totalsales
from sales
group by productkey
order by 2 desc
limit 100

products,totalsales
Development - Java,3524581.0
Consulting - Market Research,3311013.0
Development - Python,3261009.0
Consulting - Business Model,2769287.0
Development - Big Data,2644929.0
Training - SQL,2475627.0
Development - Business Logic,1948799.0
Development - .Net,1775945.0
Development - Scala,1400128.0
Training - Development,1149507.0


In [7]:

from pyspark.sql.functions import *



# Similar to definition of staticInputDF above, just using `readStream` instead of `read`
streamingInputDF = (
  spark
    .readStream                       
    .schema(salesSchema)              # Set the schema for our feed
    .option("maxFilesPerTrigger", 1)  # Treat a sequence of files as a stream by picking one file at a time
    .csv(path)
)

# Same query as staticInputDF
streamingCountsDF = (                 
  streamingInputDF
    .select("ProductKey", "SaleAmount")
    .groupBy("ProductKey")
    .sum()
)

# Is this DF actually a streaming DF?
streamingCountsDF.isStreaming

display(streamingCountsDF)



ProductKey,sum(SaleAmount)
Development - Business Logic,32263.01
Consulting - Market Research,24016.42
Training - Development,13946.36
Development - PHP,5331.03
Development - Database,11728.77
Training - Javascript,8146.44
Training - SQL,25376.09
Consulting - Strategy,13902.3
Development - .Net,15566.59
Development - Big Data,3815.46


In [8]:
# This cell is to create a streaming table
query = (
streamingCountsDF.writeStream    
    .format("memory")        # memory = store in-memory table (for testing only in Spark 2.0)
    .queryName("sales_stream")     # counts = name of the in-memory table
    .outputMode("complete")  # complete = all the counts should be in the table
    .start()
)

In [9]:
%sql 
select *  
from sales_stream
order by 2 desc
limit 100

ProductKey,sum(SaleAmount)
