In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
from pyspark.sql.functions import max, desc, col, window, column, desc

In [2]:
spark = SparkSession.builder.master("local[6]") \
                    .appName('spark_toolset') \
                    .getOrCreate()

spark.conf.set("spark.sql.shuffle.partitions", "5")

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/05/27 10:27:59 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
staticDataFrame = spark.read.format("csv") \
                    .option("header", "true") \
                    .option("inferSchema", "true") \
                    .load("../data/retail-data/by-day/*.csv")

staticDataFrame.createOrReplaceTempView("retail_data")
staticSchema = staticDataFrame.schema

                                                                                

In [4]:
print(staticSchema)
staticDataFrame.show(5)

StructType(List(StructField(InvoiceNo,StringType,true),StructField(StockCode,StringType,true),StructField(Description,StringType,true),StructField(Quantity,IntegerType,true),StructField(InvoiceDate,StringType,true),StructField(UnitPrice,DoubleType,true),StructField(CustomerID,DoubleType,true),StructField(Country,StringType,true)))
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   580538|    23084|  RABBIT NIGHT LIGHT|      48|2011-12-05 08:38:00|     1.79|   14075.0|United Kingdom|
|   580538|    23077| DOUGHNUT LIP GLOSS |      20|2011-12-05 08:38:00|     1.25|   14075.0|United Kingdom|
|   580538|    22906|12 MESSAGE CARDS ...|      24|2011-12-05 08:38:00|     1.65|   14075.0|United Kingdom|
|   580538|    2191

In [5]:
staticDataFrame.selectExpr("CustomerId", "(UnitPrice*Quantity) as TotalPrice", "InvoiceDate") \
                .groupBy(col("CustomerId"), window(col("InvoiceDate"), "1 day")) \
                .sum("TotalPrice").withColumnRenamed("sum(TotalPrice)", "TotalPrice_within_Window") \
                .show(5)



+----------+--------------------+------------------------+
|CustomerId|              window|TotalPrice_within_Window|
+----------+--------------------+------------------------+
|   14075.0|{2011-12-05 08:00...|      316.78000000000003|
|   18180.0|{2011-12-05 08:00...|                  310.73|
|   15358.0|{2011-12-05 08:00...|       830.0600000000003|
|   15392.0|{2011-12-05 08:00...|      304.40999999999997|
|   15290.0|{2011-12-05 08:00...|      263.02000000000004|
+----------+--------------------+------------------------+
only showing top 5 rows



                                                                                

In [6]:
streamingDataFrame = spark.readStream \
                        .schema(staticSchema) \
                        .option("maxFilesPerTrigger", "1") \
                        .format("csv") \
                        .option("header", "true") \
                        .load("../data/retail-data/by-day/*.csv")

                                                                                

In [7]:
streamingDataFrame.isStreaming

True

In [8]:
purchaseByCustomerPerHour = streamingDataFrame.selectExpr("CustomerId", "(UnitPrice * Quantity) as TotalPrice", "InvoiceDate") \
                    .groupBy( col("CustomerId"), window(col("InvoiceDate"), "1 day")) \
                    .sum("TotalPrice")

In [17]:
purchaseByCustomerPerHour.writeStream \
                            .format("memory") \
                            .queryName("customer_purchases") \
                            .outputMode("complete") \
                            .start()

23/05/27 11:04:26 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/wk/_nxf_xjj16s2_tbn7hkxcln80000gr/T/temporary-de14dc07-868a-43fd-83ed-26141fec0baf. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/05/27 11:04:26 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


<pyspark.sql.streaming.StreamingQuery at 0x1118bdfa0>

                                                                                

In [18]:
purchaseByCustomerPerHour.writeStream \
                            .format("console") \
                            .queryName("customer_purchases_2") \
                            .outputMode("complete") \
                            .start()

23/05/27 11:04:29 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/wk/_nxf_xjj16s2_tbn7hkxcln80000gr/T/temporary-ee8c5ad1-fa66-4d28-a5b7-fbaaa6136c2c. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/05/27 11:04:29 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


<pyspark.sql.streaming.StreamingQuery at 0x1118bdb20>

-------------------------------------------
Batch: 0
-------------------------------------------
+----------+--------------------+------------------+
|CustomerId|              window|   sum(TotalPrice)|
+----------+--------------------+------------------+
|   12921.0|{2010-12-01 08:00...|             322.4|
|   16583.0|{2010-12-01 08:00...|233.45000000000002|
|   17897.0|{2010-12-01 08:00...|            140.39|
|   12748.0|{2010-12-01 08:00...|              4.95|
|   15350.0|{2010-12-01 08:00...|            115.65|
|   17809.0|{2010-12-01 08:00...|              34.8|
|   13747.0|{2010-12-01 08:00...|              79.6|
|   16250.0|{2010-12-01 08:00...|            226.14|
|   15983.0|{2010-12-01 08:00...|            440.89|
|   17511.0|{2010-12-01 08:00...|           1825.74|
|   14001.0|{2010-12-01 08:00...|            301.24|
|   17460.0|{2010-12-01 08:00...|              19.9|
|   18074.0|{2010-12-01 08:00...|             489.6|
|   12868.0|{2010-12-01 08:00...|             203.3|
| 

In [20]:
for query in spark.streams.active:
    query.stop()

In [22]:
spark.sql("SELECT * FROM customer_purchases_2 ORDER BY `sum(TotalPrice)` DESC").show(5)

AnalysisException: Table or view not found: customer_purchases_2; line 1 pos 14;
'Sort ['sum(TotalPrice) DESC NULLS LAST], true
+- 'Project [*]
   +- 'UnresolvedRelation [customer_purchases_2], [], false


In [23]:
spark.sql("SELECT * FROM customer_purchases ORDER BY `sum(TotalPrice)` DESC").show(5)


+----------+--------------------+------------------+
|CustomerId|              window|   sum(TotalPrice)|
+----------+--------------------+------------------+
|   18102.0|{2010-12-07 08:00...|          25920.37|
|      null|{2010-12-06 08:00...|23395.099999999904|
|      null|{2010-12-03 08:00...| 23021.99999999999|
|      null|{2010-12-09 08:00...|15354.279999999955|
|      null|{2010-12-01 08:00...|12584.299999999988|
+----------+--------------------+------------------+
only showing top 5 rows



In [24]:
staticDataFrame.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: double (nullable = true)
 |-- Country: string (nullable = true)



In [25]:
preppedDataFrame = staticDataFrame \
                    .na.fill(0) \
                    .withColumn("day_of_week", date_format(col("InvoiceDate"), "EEEE")) \
                    .coalesce(5)

TypeError: withColumn() takes 3 positional arguments but 4 were given