In [2]:
from pyspark.context import SparkContext
from pyspark.conf import SparkConf
from pyspark.sql.functions import *
from pyspark.sql import *
import os

In [5]:
spark = SparkSession.builder\
                    .master('local[5]')\
                    .appName('batch job to streaming job')\
                    .getOrCreate()

In [6]:
file_path = os.getcwd()+'/data/retail-data/by-day/*.csv'

In [29]:
# 배치 방식
static_df = spark.read.format('csv')\
            .option('header', 'true')\
            .option('inferSchema', 'true')\
            .load(file_path)

select_df = static_df.select(col('CustomerID').alias('customer_id'), (col('UnitPrice')*col('Quantity')).alias('total_cost'),
                             col('InvoiceNo').alias('invoice_no'), col('InvoiceDate').alias('invoice_date'))\
                                 .withColumn('invoice_date', to_date(col('invoice_date')))

agg_df = select_df.groupBy('customer_id', 'invoice_date').agg(sum('total_cost').alias('sum_total_cost'))
agg_df.sort(col('sum_total_cost').desc()).show()

+-----------+------------+------------------+
|customer_id|invoice_date|    sum_total_cost|
+-----------+------------+------------------+
|    17450.0|  2011-09-20|          71601.44|
|       null|  2011-11-14|          55316.08|
|       null|  2011-11-07|          42939.17|
|       null|  2011-03-29| 33521.39999999998|
|       null|  2011-12-08|31975.590000000007|
|    18102.0|  2011-09-15|31661.540000000005|
|       null|  2010-12-21|31347.479999999938|
|    18102.0|  2011-10-21|          29693.82|
|    18102.0|  2010-12-07|          25920.37|
|    14646.0|  2011-10-20|25833.559999999994|
|       null|  2010-12-10|25399.560000000012|
|       null|  2010-12-17|25371.769999999768|
|       null|  2011-11-25|24148.069999999992|
|       null|  2011-11-29|23744.250000000055|
|    12415.0|  2011-06-15| 23426.81000000001|
|       null|  2010-12-06|23395.099999999904|
|       null|  2011-08-30| 23032.59999999993|
|       null|  2010-12-03| 23021.99999999999|
|    15749.0|  2011-01-11|        

In [31]:
# 스트리밍 방식
staticSchema = static_df.schema
stream_df = spark.readStream.format('csv')\
            .schema(staticSchema)\
            .option('header', 'true')\
            .option('maxFilesPerTrigger', 1)\
            .load(file_path)

print("is streaming type dataframe? :", stream_df.isStreaming)

is streaming type dataframe? : True


In [33]:
select_df = stream_df.select(col('CustomerID').alias('customer_id'), (col('UnitPrice')*col('Quantity')).alias('total_cost'),
                             col('InvoiceNo').alias('invoice_no'), col('InvoiceDate').alias('invoice_date'))\
                                 .withColumn('invoice_date', to_date(col('invoice_date')))

agg_df = select_df.groupBy('customer_id', 'invoice_date').agg(sum('total_cost').alias('sum_total_cost'))

processed_stream_df = agg_df

In [35]:
processed_stream_df.writeStream\
    .format('memory')\
    .queryName('customer_purchases')\
    .outputMode('complete')\
    .start()

"""
.format('memory')\  # 인메모리 테이블에 저장
.queryName('customer_purchases')\ # 인메모리에 저장될 테이블명
.outputMode('complete')\  #모든 카운트 수행 결과를 테이블에 저장
.start()
"""

"\n.format('memory')\\  # 인메모리 테이블에 저장\n.queryName('customer_purchases')\\ # 인메모리에 저장될 테이블명\n.outputMode('complete')\\  #모든 카운트 수행 결과를 테이블에 저장\n.start()\n"

In [36]:
spark.sql("""
SELECT *
FROM customer_purchases
ORDER BY 'total_cost' DESC
""").show()

+-----------+------------+------------------+
|customer_id|invoice_date|    sum_total_cost|
+-----------+------------+------------------+
|    14825.0|  2010-12-06|184.10000000000002|
|    15808.0|  2011-01-04|            733.92|
|    14961.0|  2011-01-06|            -16.65|
|    17700.0|  2010-12-09|             -9.85|
|    12793.0|  2010-12-03|             131.8|
|    13458.0|  2010-12-22|             -16.5|
|    12868.0|  2011-01-27|182.45000000000002|
|    13777.0|  2010-12-08|             255.0|
|    15687.0|  2011-02-01| 379.1000000000001|
|    14616.0|  2011-01-26|237.60000000000002|
|    15640.0|  2010-12-02| 333.7199999999999|
|    12721.0|  2010-12-09|            -27.15|
|    16255.0|  2010-12-09|            374.25|
|    13369.0|  2010-12-09|308.28000000000003|
|    15165.0|  2010-12-01|            487.75|
|    13198.0|  2010-12-14|            -10.95|
|    16710.0|  2010-12-14|             149.5|
|    14662.0|  2011-01-27|101.05000000000004|
|    16029.0|  2010-12-21|        