In [17]:
from pyspark.sql import functions

from src.utils import create_spark_session


In [18]:
spark = create_spark_session('Streamming Intro')
spark.conf.set("spark.sql.shuffle.partitions", "5")

In [22]:
# batch code
from pyspark.sql.functions import window, column, desc, col


staticDataFrame = spark.read.format("csv")\
    .option("header", "true")\
    .option("inferSchema", "true")\
    .load("data/retail-data/by-day/*.csv")

staticDataFrame.createOrReplaceTempView("retail_data")
staticSchema = staticDataFrame.schema

staticDataFrame\
    .selectExpr(
        "CustomerId",
        "(UnitPrice * Quantity) as total_cost",
        "InvoiceDate")\
    .groupBy(
        col("CustomerId"), window(col("InvoiceDate"), "1 day"))\
    .sum("total_cost")\
    .show(5)

+----------+--------------------+------------------+
|CustomerId|              window|   sum(total_cost)|
+----------+--------------------+------------------+
|   14075.0|[2011-12-04 16:00...|316.78000000000003|
|   18180.0|[2011-12-04 16:00...|            310.73|
|   15358.0|[2011-12-04 16:00...| 830.0600000000003|
|   15392.0|[2011-12-04 16:00...|304.40999999999997|
|   15290.0|[2011-12-04 16:00...|263.02000000000004|
+----------+--------------------+------------------+
only showing top 5 rows



In [31]:
# streamming code

# maxFilesPerTrigger: number of files we should read in at once. 
# Set to 1 to make it streaming-ish

streamingDataFrame = spark.readStream\
    .schema(staticSchema)\
    .option("maxFilesPerTrigger", 1)\
    .format("csv")\
    .option("header", "true")\
    .load("data/retail-data/by-day/*.csv")

streamingDataFrame.isStreaming

purchaseByCustomerPerHour = streamingDataFrame\
    .selectExpr(
        "CustomerId",
        "(UnitPrice * Quantity) as total_cost",
        "InvoiceDate")\
    .groupBy(
        col("CustomerId"), window(col("InvoiceDate"), "1 day"))\
    .sum("total_cost")

In [34]:
# stream to memory
purchaseByCustomerPerHour.writeStream\
    .format("memory")\
    .queryName("customer_purchases")\
    .outputMode("complete")\
    .start()

<pyspark.sql.streaming.StreamingQuery at 0x7f5d33466b50>

In [68]:
spark.sql("""
    SELECT *
    FROM customer_purchases
    ORDER BY `sum(total_cost)` DESC
""")\
.show(5)

+----------+--------------------+------------------+
|CustomerId|              window|   sum(total_cost)|
+----------+--------------------+------------------+
|   17450.0|[2011-09-19 17:00...|          71601.44|
|      null|[2011-11-07 16:00...| 64777.81999999999|
|      null|[2011-11-14 16:00...|53812.909999999996|
|      null|[2011-03-28 17:00...| 33521.39999999998|
|      null|[2010-12-09 16:00...| 33425.12000000001|
+----------+--------------------+------------------+
only showing top 5 rows



In [70]:
# this will goes into the console where you launch the jupyter lab/notebook
purchaseByCustomerPerHour.writeStream\
    .format("console")\
    .queryName("customer_purchases_2")\
    .outputMode("complete")\
    .start()

<pyspark.sql.streaming.StreamingQuery at 0x7f5d31c3a150>

In [None]:
# stop the streaming jobs
for active_stream in spark.streams.active:
    active_stream.stop()

### MLib

In [72]:
staticDataFrame.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: double (nullable = true)
 |-- Country: string (nullable = true)



In [84]:
from pyspark.sql.functions import date_format, col
preppedDataFrame = staticDataFrame\
    .na.fill(0)\
    .withColumn("day_of_week", date_format(col("InvoiceDate"), "EEEE"))\
    .coalesce(5) # coalesce to numPartitions partitions

In [82]:
preppedDataFrame.rdd.getNumPartitions()

5

In [92]:
trainDataFrame = preppedDataFrame\
    .where("InvoiceDate < '2011-07-01'")
testDataFrame = preppedDataFrame\
    .where("InvoiceDate >= '2011-07-01'")

print('Train size: ', trainDataFrame.count())
print('Test size: ', testDataFrame.count())

Train size:  245903
Test size:  296006


In [134]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql import DataFrame

def build_pipeline() -> Pipeline:
    
    indexer = StringIndexer()\
        .setInputCol("day_of_week")\
        .setOutputCol("day_of_week_index")
    encoder = OneHotEncoder()\
        .setInputCol("day_of_week_index")\
        .setOutputCol("day_of_week_encoded")
    
    vectorAssembler = VectorAssembler()\
        .setInputCols(["UnitPrice", "Quantity", "day_of_week_encoded"])\
        .setOutputCol("features")
    
    transformationPipeline = Pipeline()\
        .setStages([indexer, encoder, vectorAssembler])
    transformationPipeline.fit(trainDataFrame)
    
    return transformationPipeline

In [135]:
transformationPipeline = build_pipeline()

fittedPipeline = transformationPipeline.fit(trainDataFrame)
transformedTraining = fittedPipeline.transform(trainDataFrame)

# cache the data in memory
transformedTraining.cache()


In [144]:
import time

from pyspark.ml.clustering import KMeans


kmeans = KMeans()\
    .setK(20)\
    .setSeed(1)

start = time.perf_counter()
kmModel = kmeans.fit(transformedTraining)
print("Elapsed time : ", time.perf_counter() - start)

train_cost = kmModel.computeCost(transformedTraining)
print("Training cost: ", train_cost)

transformedTest = fittedPipeline.transform(testDataFrame)
test_cost = kmModel.computeCost(transformedTest)
print("Test cost: ", test_cost)

Elapsed time :  1.791419599991059
Training cost:  84553739.96537484
Test cost:  517507094.72221166


### Lower-LEVEL APIs

- RDDs: reveal physical execution characteristics (i.e. partitions)
- Should not be used unless necessary (almost never)
    

In [140]:
# build Spark DataFrame from in memory raw-data
from pyspark.sql import Row
spark.sparkContext.parallelize([Row(1), Row(2), Row(3)]).toDF()

DataFrame[_1: bigint]