In [2]:
# To make sure that this notebook is being run on a Spark 2.0 cluster, let's see if we can access the SparkSession - the new entry point of Apache Spark 2.0.
# If this fails, then you are not connected to a Spark 2.0 cluster. Please recreate your cluster and select the version to be "Spark 2.0 (Scala 2.10)".
spark

In [4]:
%fs ls /FileStore/tables

Data

In [6]:
%fs head /FileStore/tables/Online_Retail_formatted_1-3daf0.csv

Batch/Interactive Processing

In [8]:
from pyspark.sql.types import *

inputPath = "/FileStore/tables/"

# Since we know the data format already, let's define the schema to speed up processing (no need for Spark to infer schema)


# Static DataFrame representing data in the CSV files
staticInputDF = (
  spark
    .read
    .format("csv")
    .option("header", "true")
    .option("inferschema","true")
    .load(inputPath+"/*.csv")
  
)

display(staticInputDF)


In [9]:
staticInputDF.count()

In [10]:
from pyspark.sql.functions import *
df = staticInputDF.withColumn("InvoiceDateTime", unix_timestamp("InvoiceDate", "MM/dd/yyyy HH:mm")
    .cast("timestamp"))
display(df)

In [11]:
from pyspark.sql.functions import *      # for window() function

staticCountsDF = (
  df
  .groupBy(window(df.InvoiceDateTime, "1 week") , df.Country)
  .count()
)
staticCountsDF.cache()

# Register the DataFrame as table 'static_counts'
staticCountsDF.createOrReplaceTempView("static_counts")

In [12]:
%sql select * from static_counts

Transactions processed by each country in one week

In [14]:
%sql select Country, date_format(window.start, "MM-dd-yy") as date, count as Transactions from static_counts order by  date,Country 

In [16]:
from pyspark.sql.functions import *

csvschema = StructType([ StructField("InvoiceNo", StringType(), True), StructField("StockCode", StringType(), True),StructField("Description", StringType(), True),StructField("Quantity", IntegerType(), True),StructField("UnitPrice", DoubleType(), True),StructField("Sales", DoubleType(), True),StructField("InvoiceDate", StringType(), True),StructField("CustomerID", IntegerType(), True) ,StructField("Country", StringType(), True),StructField("InvoiceDateTime", TimestampType(), True)])
# Similar to definition of staticInputDF above, just using `readStream` instead of `read`
streamingInputDF = (
  spark
    .readStream
    .format("csv")
    .option("header", "true")
    .option("inferschema","true")
    .option("maxFilesPerTrigger", 1)
    .schema(csvschema)
    .load(inputPath+"/*.csv")
    
    
)


streamingInputDF = streamingInputDF.withColumn("InvoiceDateTime", unix_timestamp("InvoiceDate", "MM/dd/yyyy HH:mm")
    .cast("timestamp"))




# Same query as staticInputDF



# Same query as staticInputDF
streamingCountsDF = (                 
  streamingInputDF
  .groupBy(window(streamingInputDF.InvoiceDateTime, "1 week") , streamingInputDF.Country, streamingInputDF.Description)
  .count()
)


# Is this DF actually a streaming DF?
streamingCountsDF.isStreaming


In [17]:
display(streamingInputDF)

In [18]:
spark.conf.set("spark.sql.shuffle.partitions", "2")  # keep the size of shuffles small

query = (
  streamingCountsDF
    .writeStream
    .format("memory")        # memory = store in-memory table (for testing only in Spark 2.0)
    .queryName("counts")     # counts = name of the in-memory table
    .outputMode("complete")  # complete = all the counts should be in the table
    .start()
)

In [19]:
from time import sleep
sleep(15)  # wait a bit for computation to start

In [20]:
%sql select count(*) from counts

Transactions processed by each country in 1 week

In [23]:
%sql select Description, Country, date_format(window.start, "MM-dd-yy") as date, count as Transactions from counts order by  date,Country,Description

In [24]:
sleep(5)  # wait a bit for computation to start

In [25]:
%sql select Description, Country, date_format(window.start, "MM-dd-yy") as date, count as Transactions from counts order by  date,Country,Description

In [26]:
%sql select count(*) from counts

In [27]:
sleep(5)  # wait a bit for computation to start

In [28]:
%sql select Description, Country, date_format(window.start, "MM-dd-yy") as date, count as Transactions from counts order by  date,Country,Description

In [29]:
sleep(5)  # wait a bit for computation to start

In [30]:
sleep(10)  # wait a bit more for more data to be computed

In [31]:
%sql select Description, Country, date_format(window.start, "MM-dd-yy") as date, count as Transactions from counts order by  date,Country,Description

In [32]:
%sql select count(*) from counts