In [1]:
from pyspark import SparkContext

In [2]:
fd2015 = spark.read \
                .option("inferSchema", "true") \
                .option("header", "true") \
                .csv("/Users/jhnam/workspace/Spark-The-Definitive-Guide/data/flight-data/csv/2015-summary.csv")

In [3]:
fd2015.take(3)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344)]

In [4]:
fd2015.sort("count").explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [count#19 ASC NULLS FIRST], true, 0
   +- Exchange rangepartitioning(count#19 ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [plan_id=33]
      +- FileScan csv [DEST_COUNTRY_NAME#17,ORIGIN_COUNTRY_NAME#18,count#19] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/Users/jhnam/workspace/Spark-The-Definitive-Guide/data/flight-dat..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:int>




In [5]:
spark.conf.set("spark.sql.shuffle.partitions", "5")

fd2015.sort("count").take(2)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count=1),
 Row(DEST_COUNTRY_NAME='Moldova', ORIGIN_COUNTRY_NAME='United States', count=1)]

In [6]:
fd2015.createOrReplaceTempView("fd2015")

In [7]:
sqlway = spark.sql("""
SELECT DEST_COUNTRY_NAME, count(1)
FROM fd2015
GROUP BY DEST_COUNTRY_NAME
""")

sqlway.show()

df_way = fd2015.groupBy("DEST_COUNTRY_NAME").count()

df_way.show()

+--------------------+--------+
|   DEST_COUNTRY_NAME|count(1)|
+--------------------+--------+
|             Moldova|       1|
|             Bolivia|       1|
|             Algeria|       1|
|Turks and Caicos ...|       1|
|            Pakistan|       1|
|    Marshall Islands|       1|
|            Suriname|       1|
|              Panama|       1|
|         New Zealand|       1|
|             Liberia|       1|
|             Ireland|       1|
|              Zambia|       1|
|            Malaysia|       1|
|               Japan|       1|
|    French Polynesia|       1|
|           Singapore|       1|
|             Denmark|       1|
|               Spain|       1|
|             Bermuda|       1|
|            Kiribati|       1|
+--------------------+--------+
only showing top 20 rows

+--------------------+-----+
|   DEST_COUNTRY_NAME|count|
+--------------------+-----+
|             Moldova|    1|
|             Bolivia|    1|
|             Algeria|    1|
|Turks and Caicos ...|    1|
|  

In [8]:
sqlway.explain()
df_way.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[DEST_COUNTRY_NAME#17], functions=[count(1)])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#17, 5), ENSURE_REQUIREMENTS, [plan_id=145]
      +- HashAggregate(keys=[DEST_COUNTRY_NAME#17], functions=[partial_count(1)])
         +- FileScan csv [DEST_COUNTRY_NAME#17] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/Users/jhnam/workspace/Spark-The-Definitive-Guide/data/flight-dat..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>


== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[DEST_COUNTRY_NAME#17], functions=[count(1)])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#17, 5), ENSURE_REQUIREMENTS, [plan_id=158]
      +- HashAggregate(keys=[DEST_COUNTRY_NAME#17], functions=[partial_count(1)])
         +- FileScan csv [DEST_COUNTRY_NAME#17] Batched: false, DataFilters: [], Format: CSV, Location: InMe

In [9]:
spark.sql("SELECT max(count) from fd2015").take(1)

[Row(max(count)=370002)]

In [10]:
from pyspark.sql.functions import max

fd2015.select(max("count")).take(1)

[Row(max(count)=370002)]

In [11]:
maxSql = spark.sql("""
select dest_country_name, sum(count) as destination_total
from fd2015
group by dest_country_name
order by sum(count) desc
limit 5
""")

maxSql.show()

+-----------------+-----------------+
|dest_country_name|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



In [12]:
from pyspark.sql.functions import desc

max_df = fd2015.groupBy("dest_country_name")\
.sum("count")\
.withColumnRenamed("sum(count)", "destination_total")\
.sort(desc("destination_total"))\
.limit(5)

max_df.show()
max_df.explain()

+-----------------+-----------------+
|dest_country_name|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- TakeOrderedAndProject(limit=5, orderBy=[destination_total#108L DESC NULLS LAST], output=[dest_country_name#17,destination_total#108L])
   +- HashAggregate(keys=[dest_country_name#17], functions=[sum(count#19)])
      +- Exchange hashpartitioning(dest_country_name#17, 5), ENSURE_REQUIREMENTS, [plan_id=358]
         +- HashAggregate(keys=[dest_country_name#17], functions=[partial_sum(count#19)])
            +- FileScan csv [DEST_COUNTRY_NAME#17,count#19] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/Users/jhnam/workspace/Spark-The-Definitive-Guide/dat

In [2]:
static_df = spark.read \
        .option("inferSchema", "true") \
        .option("header", "true") \
        .csv("/Users/jhnam/workspace/Spark-The-Definitive-Guide/data/retail-data/by-day/*.csv")

static_df.createOrReplaceTempView("retail_data")
static_schema = static_df.schema

static_schema

                                                                                

StructType([StructField('InvoiceNo', StringType(), True), StructField('StockCode', StringType(), True), StructField('Description', StringType(), True), StructField('Quantity', IntegerType(), True), StructField('InvoiceDate', TimestampType(), True), StructField('UnitPrice', DoubleType(), True), StructField('CustomerID', DoubleType(), True), StructField('Country', StringType(), True)])

In [3]:
from pyspark.sql.functions import window, col

static_df.selectExpr("CustomerId", "(UnitPrice * Quantity) as total_cost", "InvoiceDate")\
        .groupBy(col("CustomerId"), window(col("InvoiceDate"), "1 day"))\
        .sum("total_cost")\
        .show(5)



+----------+--------------------+-----------------+
|CustomerId|              window|  sum(total_cost)|
+----------+--------------------+-----------------+
|   16057.0|{2011-12-05 09:00...|            -37.6|
|   14126.0|{2011-11-29 09:00...|643.6300000000001|
|   13500.0|{2011-11-16 09:00...|497.9700000000001|
|   17160.0|{2011-11-08 09:00...|516.8499999999999|
|   15608.0|{2011-11-11 09:00...|            122.4|
+----------+--------------------+-----------------+
only showing top 5 rows



                                                                                

In [15]:
streaming_df = spark.readStream\
                    .schema(static_schema)\
                    .option("maxFilesPerTrigger", 1)\
                    .format("csv")\
                    .option("header", "true")\
                    .load("/Users/jhnam/workspace/Spark-The-Definitive-Guide/data/retail-data/by-day/*.csv")

In [16]:
streaming_df.isStreaming

True

In [17]:
purchaseByCustomerPerHour = streaming_df.selectExpr("CustomerId", "(UnitPrice * Quantity) as total_cost", "InvoiceDate")\
        .groupBy(col("CustomerId"), window(col("InvoiceDate"), "1 day"))\
        .sum("total_cost")

In [18]:
# purchaseByCustomerPerHour.writeStream\
#                         .format("memory")\
#                         .queryName("customer_purchases")\
#                         .outputMode("complete")\
#                         .start()

23/03/13 16:13:28 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/vf/p_2t8d9s3dgc1k68_4bwc6r80000gn/T/temporary-0487e923-cc53-45ac-b6ce-d0da32dde47d. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/03/13 16:13:28 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


<pyspark.sql.streaming.StreamingQuery at 0x112f80590>

In [19]:
spark.sql("""
select * from customer_purchases order by `sum(total_cost)` desc
""").show()

+----------+------+---------------+
|CustomerId|window|sum(total_cost)|
+----------+------+---------------+
+----------+------+---------------+



In [20]:
purchaseByCustomerPerHour.writeStream\
                        .format("console")\
                        .queryName("customer_purchases_2")\
                        .outputMode("complete")\
                        .start()

23/03/13 16:14:50 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/vf/p_2t8d9s3dgc1k68_4bwc6r80000gn/T/temporary-7e1cf37e-576c-43c0-8704-18180677a333. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/03/13 16:14:50 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


<pyspark.sql.streaming.StreamingQuery at 0x112f82b90>

-------------------------------------------
Batch: 0
-------------------------------------------
+----------+--------------------+------------------+
|CustomerId|              window|   sum(total_cost)|
+----------+--------------------+------------------+
|   12921.0|{2010-12-01 09:00...|             322.4|
|   16583.0|{2010-12-01 09:00...|233.45000000000002|
|   17897.0|{2010-12-01 09:00...|            140.39|
|   12748.0|{2010-12-01 09:00...|              4.95|
|   15350.0|{2010-12-01 09:00...|            115.65|
|   17809.0|{2010-12-01 09:00...|              34.8|
|   13747.0|{2010-12-01 09:00...|              79.6|
|   16250.0|{2010-12-01 09:00...|            226.14|
|   15983.0|{2010-12-01 09:00...|            440.89|
|   17511.0|{2010-12-01 09:00...|           1825.74|
|   14001.0|{2010-12-01 09:00...|            301.24|
|   17460.0|{2010-12-01 09:00...|              19.9|
|   18074.0|{2010-12-01 09:00...|             489.6|
|   12868.0|{2010-12-01 09:00...|             203.3|
| 

-------------------------------------------
Batch: 6
-------------------------------------------
+----------+--------------------+------------------+
|CustomerId|              window|   sum(total_cost)|
+----------+--------------------+------------------+
|   13329.0|{2010-12-08 09:00...|             304.2|
|   16250.0|{2010-12-01 09:00...|            226.14|
|   17460.0|{2010-12-01 09:00...|              19.9|
|   13491.0|{2010-12-02 09:00...|              98.9|
|   14594.0|{2010-12-01 09:00...|254.99999999999997|
|   15899.0|{2010-12-06 09:00...|             56.25|
|   14865.0|{2010-12-02 09:00...|              37.2|
|   14800.0|{2010-12-05 09:00...| 555.8399999999999|
|   15235.0|{2010-12-05 09:00...| 85.55000000000001|
|   15078.0|{2010-12-06 09:00...| 475.1499999999999|
|   18041.0|{2010-12-02 09:00...| 428.9399999999999|
|   12471.0|{2010-12-02 09:00...|             -17.0|
|   12433.0|{2010-12-08 09:00...|1867.9800000000002|
|   17949.0|{2010-12-03 09:00...|            1314.0|
| 

-------------------------------------------
Batch: 12
-------------------------------------------
+----------+--------------------+-------------------+
|CustomerId|              window|    sum(total_cost)|
+----------+--------------------+-------------------+
|   17576.0|{2010-12-13 09:00...| 177.35000000000002|
|   15039.0|{2010-12-14 09:00...|  706.2500000000002|
|   16250.0|{2010-12-01 09:00...|             226.14|
|   14594.0|{2010-12-01 09:00...| 254.99999999999997|
|   15899.0|{2010-12-06 09:00...|              56.25|
|   17220.0|{2010-12-10 09:00...| 317.50000000000006|
|   14865.0|{2010-12-02 09:00...|               37.2|
|   14800.0|{2010-12-05 09:00...|  555.8399999999999|
|   14256.0|{2010-12-10 09:00...|  523.8599999999999|
|   12434.0|{2010-12-14 09:00...|-27.749999999999996|
|   18041.0|{2010-12-02 09:00...|  428.9399999999999|
|   17551.0|{2010-12-15 09:00...|             306.84|
|   16565.0|{2010-12-10 09:00...|              173.7|
|   17949.0|{2010-12-03 09:00...|     

-------------------------------------------
Batch: 18
-------------------------------------------
+----------+--------------------+-------------------+
|CustomerId|              window|    sum(total_cost)|
+----------+--------------------+-------------------+
|   17576.0|{2010-12-13 09:00...| 177.35000000000002|
|   15208.0|{2010-12-21 09:00...|               65.4|
|   15039.0|{2010-12-14 09:00...|  706.2500000000002|
|   16250.0|{2010-12-01 09:00...|             226.14|
|   14594.0|{2010-12-01 09:00...| 254.99999999999997|
|   15899.0|{2010-12-06 09:00...|              56.25|
|   17220.0|{2010-12-10 09:00...| 317.50000000000006|
|   14865.0|{2010-12-02 09:00...|               37.2|
|   18223.0|{2010-12-16 09:00...|  501.6899999999999|
|   13329.0|{2010-12-20 09:00...|-35.400000000000006|
|   14800.0|{2010-12-05 09:00...|  555.8399999999999|
|   14256.0|{2010-12-10 09:00...|  523.8599999999999|
|   12434.0|{2010-12-14 09:00...|-27.749999999999996|
|   18041.0|{2010-12-02 09:00...|  428

-------------------------------------------
Batch: 24
-------------------------------------------
+----------+--------------------+-------------------+
|CustomerId|              window|    sum(total_cost)|
+----------+--------------------+-------------------+
|   17576.0|{2010-12-13 09:00...| 177.35000000000002|
|   17368.0|{2011-01-06 09:00...|  563.1500000000001|
|   15208.0|{2010-12-21 09:00...|               65.4|
|   15039.0|{2010-12-14 09:00...|  706.2500000000002|
|   16250.0|{2010-12-01 09:00...|             226.14|
|   14594.0|{2010-12-01 09:00...| 254.99999999999997|
|   15899.0|{2010-12-06 09:00...|              56.25|
|   17220.0|{2010-12-10 09:00...| 317.50000000000006|
|   14865.0|{2010-12-02 09:00...|               37.2|
|   18223.0|{2010-12-16 09:00...|  501.6899999999999|
|   13329.0|{2010-12-20 09:00...|-35.400000000000006|
|   14800.0|{2010-12-05 09:00...|  555.8399999999999|
|   14256.0|{2010-12-10 09:00...|  523.8599999999999|
|   12434.0|{2010-12-14 09:00...|-27.7

-------------------------------------------
Batch: 30
-------------------------------------------
+----------+--------------------+-------------------+
|CustomerId|              window|    sum(total_cost)|
+----------+--------------------+-------------------+
|   15208.0|{2010-12-21 09:00...|               65.4|
|   15039.0|{2010-12-14 09:00...|  706.2500000000002|
|   16250.0|{2010-12-01 09:00...|             226.14|
|   14594.0|{2010-12-01 09:00...| 254.99999999999997|
|   15899.0|{2010-12-06 09:00...|              56.25|
|   14865.0|{2010-12-02 09:00...|               37.2|
|   18223.0|{2010-12-16 09:00...|  501.6899999999999|
|   14800.0|{2010-12-05 09:00...|  555.8399999999999|
|   14256.0|{2010-12-10 09:00...|  523.8599999999999|
|   12434.0|{2010-12-14 09:00...|-27.749999999999996|
|   13715.0|{2011-01-05 09:00...|  445.2200000000002|
|   16607.0|{2010-12-15 09:00...|             404.82|
|   13081.0|{2011-01-14 09:00...|-13.200000000000001|
|   15799.0|{2011-01-09 09:00...|     

-------------------------------------------
Batch: 36
-------------------------------------------
+----------+--------------------+-------------------+
|CustomerId|              window|    sum(total_cost)|
+----------+--------------------+-------------------+
|   15208.0|{2010-12-21 09:00...|               65.4|
|   15039.0|{2010-12-14 09:00...|  706.2500000000002|
|   16250.0|{2010-12-01 09:00...|             226.14|
|   14594.0|{2010-12-01 09:00...| 254.99999999999997|
|   15899.0|{2010-12-06 09:00...|              56.25|
|   17504.0|{2011-01-21 09:00...| 441.15000000000003|
|   14865.0|{2010-12-02 09:00...|               37.2|
|   18223.0|{2010-12-16 09:00...|  501.6899999999999|
|   14800.0|{2010-12-05 09:00...|  555.8399999999999|
|   14256.0|{2010-12-10 09:00...|  523.8599999999999|
|   12727.0|{2011-01-23 09:00...|              514.5|
|   17602.0|{2011-01-19 09:00...|             767.05|
|   12434.0|{2010-12-14 09:00...|-27.749999999999996|
|   13715.0|{2011-01-05 09:00...|  445

-------------------------------------------
Batch: 42
-------------------------------------------
+----------+--------------------+-------------------+
|CustomerId|              window|    sum(total_cost)|
+----------+--------------------+-------------------+
|   15208.0|{2010-12-21 09:00...|               65.4|
|   15039.0|{2010-12-14 09:00...|  706.2500000000002|
|   16250.0|{2010-12-01 09:00...|             226.14|
|   14594.0|{2010-12-01 09:00...| 254.99999999999997|
|   15899.0|{2010-12-06 09:00...|              56.25|
|   17504.0|{2011-01-21 09:00...| 441.15000000000003|
|   14865.0|{2010-12-02 09:00...|               37.2|
|   18223.0|{2010-12-16 09:00...|  501.6899999999999|
|   14334.0|{2011-01-24 09:00...| 352.41999999999996|
|   14800.0|{2010-12-05 09:00...|  555.8399999999999|
|   14256.0|{2010-12-10 09:00...|  523.8599999999999|
|   12727.0|{2011-01-23 09:00...|              514.5|
|   17602.0|{2011-01-19 09:00...|             767.05|
|   12434.0|{2010-12-14 09:00...|-27.7

-------------------------------------------
Batch: 48
-------------------------------------------
+----------+--------------------+-------------------+
|CustomerId|              window|    sum(total_cost)|
+----------+--------------------+-------------------+
|   14627.0|{2011-02-01 09:00...|-21.849999999999998|
|   15208.0|{2010-12-21 09:00...|               65.4|
|   15039.0|{2010-12-14 09:00...|  706.2500000000002|
|   14911.0|{2011-01-31 09:00...|             797.77|
|   16250.0|{2010-12-01 09:00...|             226.14|
|   12373.0|{2011-02-01 09:00...|              364.6|
|   14594.0|{2010-12-01 09:00...| 254.99999999999997|
|   15899.0|{2010-12-06 09:00...|              56.25|
|   17504.0|{2011-01-21 09:00...| 441.15000000000003|
|   14865.0|{2010-12-02 09:00...|               37.2|
|   18223.0|{2010-12-16 09:00...|  501.6899999999999|
|   14334.0|{2011-01-24 09:00...| 352.41999999999996|
|   14800.0|{2010-12-05 09:00...|  555.8399999999999|
|   14606.0|{2011-02-01 09:00...| 157.

-------------------------------------------
Batch: 54
-------------------------------------------
+----------+--------------------+-------------------+
|CustomerId|              window|    sum(total_cost)|
+----------+--------------------+-------------------+
|   14627.0|{2011-02-01 09:00...|-21.849999999999998|
|   15208.0|{2010-12-21 09:00...|               65.4|
|   15039.0|{2010-12-14 09:00...|  706.2500000000002|
|   14911.0|{2011-01-31 09:00...|             797.77|
|   16250.0|{2010-12-01 09:00...|             226.14|
|   12373.0|{2011-02-01 09:00...|              364.6|
|   16842.0|{2011-02-10 09:00...|  520.5699999999999|
|   14594.0|{2010-12-01 09:00...| 254.99999999999997|
|   15899.0|{2010-12-06 09:00...|              56.25|
|   17504.0|{2011-01-21 09:00...| 441.15000000000003|
|   14865.0|{2010-12-02 09:00...|               37.2|
|   18223.0|{2010-12-16 09:00...|  501.6899999999999|
|   14334.0|{2011-01-24 09:00...| 352.41999999999996|
|   12913.0|{2011-02-11 09:00...|     

-------------------------------------------
Batch: 60
-------------------------------------------
+----------+--------------------+-------------------+
|CustomerId|              window|    sum(total_cost)|
+----------+--------------------+-------------------+
|   14627.0|{2011-02-01 09:00...|-21.849999999999998|
|   14825.0|{2011-02-15 09:00...| 241.34000000000006|
|   15208.0|{2010-12-21 09:00...|               65.4|
|   15039.0|{2010-12-14 09:00...|  706.2500000000002|
|   14911.0|{2011-01-31 09:00...|             797.77|
|   16250.0|{2010-12-01 09:00...|             226.14|
|   12373.0|{2011-02-01 09:00...|              364.6|
|   16842.0|{2011-02-10 09:00...|  520.5699999999999|
|   14594.0|{2010-12-01 09:00...| 254.99999999999997|
|   15899.0|{2010-12-06 09:00...|              56.25|
|   17504.0|{2011-01-21 09:00...| 441.15000000000003|
|   14865.0|{2010-12-02 09:00...|               37.2|
|   18223.0|{2010-12-16 09:00...|  501.6899999999999|
|   14334.0|{2011-01-24 09:00...| 352.

-------------------------------------------
Batch: 66
-------------------------------------------
+----------+--------------------+------------------+
|CustomerId|              window|   sum(total_cost)|
+----------+--------------------+------------------+
|   15290.0|{2011-02-22 09:00...|             -1.65|
|   15208.0|{2010-12-21 09:00...|              65.4|
|   14911.0|{2011-01-31 09:00...|            797.77|
|   16842.0|{2011-02-10 09:00...| 520.5699999999999|
|   14594.0|{2010-12-01 09:00...|254.99999999999997|
|   15899.0|{2010-12-06 09:00...|             56.25|
|   17504.0|{2011-01-21 09:00...|441.15000000000003|
|   14865.0|{2010-12-02 09:00...|              37.2|
|   18223.0|{2010-12-16 09:00...| 501.6899999999999|
|   14334.0|{2011-01-24 09:00...|352.41999999999996|
|   12913.0|{2011-02-11 09:00...|             313.8|
|   17243.0|{2011-02-27 09:00...| 373.6499999999999|
|   18188.0|{2011-02-22 09:00...|             426.6|
|   17175.0|{2011-02-16 09:00...|            519.08|
|

-------------------------------------------
Batch: 72
-------------------------------------------
+----------+--------------------+------------------+
|CustomerId|              window|   sum(total_cost)|
+----------+--------------------+------------------+
|   15290.0|{2011-02-22 09:00...|             -1.65|
|   15208.0|{2010-12-21 09:00...|              65.4|
|   14911.0|{2011-01-31 09:00...|            797.77|
|   17652.0|{2011-03-03 09:00...|             222.3|
|   16842.0|{2011-02-10 09:00...| 520.5699999999999|
|   14594.0|{2010-12-01 09:00...|254.99999999999997|
|   15899.0|{2010-12-06 09:00...|             56.25|
|   17504.0|{2011-01-21 09:00...|441.15000000000003|
|   14865.0|{2010-12-02 09:00...|              37.2|
|   18223.0|{2010-12-16 09:00...| 501.6899999999999|
|   14334.0|{2011-01-24 09:00...|352.41999999999996|
|   12913.0|{2011-02-11 09:00...|             313.8|
|   17243.0|{2011-02-27 09:00...| 373.6499999999999|
|   17841.0|{2011-02-28 09:00...|227.95000000000005|
|

-------------------------------------------
Batch: 78
-------------------------------------------
+----------+--------------------+------------------+
|CustomerId|              window|   sum(total_cost)|
+----------+--------------------+------------------+
|   15290.0|{2011-02-22 09:00...|             -1.65|
|   14911.0|{2011-03-11 09:00...|               0.0|
|   15208.0|{2010-12-21 09:00...|              65.4|
|   14911.0|{2011-01-31 09:00...|            797.77|
|   17652.0|{2011-03-03 09:00...|             222.3|
|   16842.0|{2011-02-10 09:00...| 520.5699999999999|
|   14594.0|{2010-12-01 09:00...|254.99999999999997|
|   15899.0|{2010-12-06 09:00...|             56.25|
|   17504.0|{2011-01-21 09:00...|441.15000000000003|
|   14865.0|{2010-12-02 09:00...|              37.2|
|   18223.0|{2010-12-16 09:00...| 501.6899999999999|
|   14334.0|{2011-01-24 09:00...|352.41999999999996|
|   12913.0|{2011-02-11 09:00...|             313.8|
|   17961.0|{2011-03-11 09:00...|3.0999999999999996|
|

-------------------------------------------
Batch: 84
-------------------------------------------
+----------+--------------------+------------------+
|CustomerId|              window|   sum(total_cost)|
+----------+--------------------+------------------+
|   15290.0|{2011-02-22 09:00...|             -1.65|
|   14911.0|{2011-03-11 09:00...|               0.0|
|   15208.0|{2010-12-21 09:00...|              65.4|
|   14911.0|{2011-01-31 09:00...|            797.77|
|   15694.0|{2011-03-16 09:00...|            584.76|
|   17652.0|{2011-03-03 09:00...|             222.3|
|   16842.0|{2011-02-10 09:00...| 520.5699999999999|
|   14594.0|{2010-12-01 09:00...|254.99999999999997|
|   15899.0|{2010-12-06 09:00...|             56.25|
|   17504.0|{2011-01-21 09:00...|441.15000000000003|
|   14865.0|{2010-12-02 09:00...|              37.2|
|   18223.0|{2010-12-16 09:00...| 501.6899999999999|
|   14334.0|{2011-01-24 09:00...|352.41999999999996|
|   12913.0|{2011-02-11 09:00...|             313.8|
|

Exception in thread "stream execution thread for customer_purchases_2 [id = e17e1bc1-c794-4c59-87af-6f7b1a5c2106, runId = b4e5d374-94e6-4b77-8fd3-948cd0a97b2e]" org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:103)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:87)
	at org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef.deactivateInstances(StateStoreCoordinator.scala:119)
	at org.apache.spark.sql.streaming.StreamingQueryManager.notifyQueryTermination(StreamingQueryManager.scala:406)
	at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$3(StreamExecution.scala:357)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.UninterruptibleThread.runUnint

In [4]:
static_df.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: double (nullable = true)
 |-- Country: string (nullable = true)



In [6]:
df = spark.range(500).toDF("number")
df.select(df["number"] + 10).show()

+-------------+
|(number + 10)|
+-------------+
|           10|
|           11|
|           12|
|           13|
|           14|
|           15|
|           16|
|           17|
|           18|
|           19|
|           20|
|           21|
|           22|
|           23|
|           24|
|           25|
|           26|
|           27|
|           28|
|           29|
+-------------+
only showing top 20 rows



In [10]:
spark.range(2).collect()

[Row(id=0), Row(id=1)]

In [11]:
from pyspark.sql.types import *
b = ByteType()
b

ByteType()