# Data Pipeline using Databricks
This note book demonstrates how to use Databricks to **build sales data ETL pipeline** that is **executed everyday** at 10 am.

## Load Dataset
An online-retail dataset, provided by databricks, was used for this demo. The dataset can be found in the following directory.

In [0]:
%fs ls "/databricks-datasets/online_retail/data-001"

path,name,size,modificationTime
dbfs:/databricks-datasets/online_retail/data-001/data.csv,data.csv,5357240,1466388696000


Using the _head_ command, some content of the csv file is displayed for later SQL table buildup.

In [0]:
%fs head --maxBytes=248 "/databricks-datasets/online_retail/data-001/data.csv"

Read in sales data from the cloud.

In [0]:
from pyspark.sql.types import DoubleType, IntegerType, StringType, TimestampType, StructType, StructField

# Define variables used in the code below
file_path = "/databricks-datasets/online_retail/data-001/"
table_name = "raw_sales_data"
checkpoint_path = "/tmp/pipeline_get_started/_checkpoint/sales_data"

schema = StructType(
  [
    StructField("InvoiceNo", StringType(), True),
    StructField("StockCode", StringType(), True),
    StructField("Description", StringType(), True),
    StructField("Quantity", IntegerType(), True),
    StructField("InvoiceDate", StringType(), True),
    StructField("UnitPrice", DoubleType(), True),
    StructField("CustomerID", DoubleType(), True),
    StructField("Country", IntegerType(), True)
  ]
)

(spark.readStream
  .format("cloudFiles")
  .schema(schema)
  .option("cloudFiles.format", "csv")
  .option("sep",",")
  .load(file_path)
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(availableNow=True)
  .toTable(table_name)
)

<pyspark.sql.streaming.query.StreamingQuery at 0x7f80a8386d70>

Create table and insert data.

In [0]:
%sql
CREATE OR REPLACE TABLE
  sales_data (
    InvoiceNo STRING,
    StockCode STRING,
    Description STRING,
    Quantity DOUBLE,
    InvoiceDate STRING,
    UnitPrice DOUBLE,
    CustomerID STRING,
    Country DOUBLE);

INSERT INTO
  sales_data
SELECT
  InvoiceNo,
  StockCode,
  Description,
  Quantity,
  InvoiceDate,
  UnitPrice,
  CustomerID,
  Country
FROM
  raw_sales_data

num_affected_rows,num_inserted_rows
65500,65500


Select sales data of product of interest and saved as pyspark dataframe.

In [0]:
%sql
SELECT *
FROM
  sales_data
WHERE
  Description = 'FELTCRAFT CUSHION RABBIT'

InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
536412,22570,FELTCRAFT CUSHION RABBIT,3.0,12/1/10 11:49,3.75,17920.0,
536528,22570,FELTCRAFT CUSHION RABBIT,1.0,12/1/10 13:17,3.75,15525.0,
536557,22570,FELTCRAFT CUSHION RABBIT,1.0,12/1/10 14:41,3.75,17841.0,
536592,22570,FELTCRAFT CUSHION RABBIT,1.0,12/1/10 17:06,7.62,,
536624,22570,FELTCRAFT CUSHION RABBIT,4.0,12/2/10 10:45,3.75,13418.0,
536754,22570,FELTCRAFT CUSHION RABBIT,1.0,12/2/10 14:09,3.75,14449.0,
536793,22570,FELTCRAFT CUSHION RABBIT,4.0,12/2/10 15:39,3.75,16203.0,
536808,22570,FELTCRAFT CUSHION RABBIT,12.0,12/2/10 16:46,3.75,17659.0,
537030,22570,FELTCRAFT CUSHION RABBIT,8.0,12/3/10 16:44,3.75,16455.0,
537034,22570,FELTCRAFT CUSHION RABBIT,8.0,12/3/10 17:20,3.75,13081.0,
