In [1]:
staticDataFrame = spark.read.format("csv")\
  .option("header", "true")\
  .option("inferSchema", "true")\
  .load("../data/retail-data/by-day/*.csv")

staticDataFrame.createOrReplaceTempView("retail_data")
staticSchema = staticDataFrame.schema


# COMMAND ----------

In [2]:
staticDataFrame.describe().show()

+-------+------------------+------------------+--------------------+-----------------+-----------------+------------------+-----------+
|summary|         InvoiceNo|         StockCode|         Description|         Quantity|        UnitPrice|        CustomerID|    Country|
+-------+------------------+------------------+--------------------+-----------------+-----------------+------------------+-----------+
|  count|            541909|            541909|              540455|           541909|           541909|            406829|     541909|
|   mean|  559965.752026781|27623.240210938104|             20713.0| 9.55224954743324|4.611113626089641|15287.690570239585|       null|
| stddev|13428.417280796697|16799.737628427683|                 NaN|218.0811578502335|96.75985306117963| 1713.600303321597|       null|
|    min|            536365|             10002| 4 PURPLE FLOCK D...|           -80995|        -11062.06|           12346.0|  Australia|
|    max|           C581569|                 m| 

In [3]:
# window : 자르는 것을 grouup by해서 자르기
from pyspark.sql.functions import window, column, desc, col
staticDataFrame\
  .selectExpr(
    "CustomerId",
    "(UnitPrice * Quantity) as total_cost",
    "InvoiceDate")\
  .groupBy(
    col("CustomerId"), window(col("InvoiceDate"), "1 day"))\
  .sum("total_cost")\
  .show(5)


# COMMAND ----------

+----------+--------------------+-----------------+
|CustomerId|              window|  sum(total_cost)|
+----------+--------------------+-----------------+
|   16057.0|[2011-12-05 00:00...|            -37.6|
|   14126.0|[2011-11-29 00:00...|643.6300000000001|
|   13500.0|[2011-11-16 00:00...|497.9700000000001|
|   17160.0|[2011-11-08 00:00...|516.8499999999999|
|   15608.0|[2011-11-11 00:00...|            122.4|
+----------+--------------------+-----------------+
only showing top 5 rows



In [4]:
streamingDataFrame = spark.readStream\
    .schema(staticSchema)\
    .option("maxFilesPerTrigger", 1)\
    .format("csv")\
    .option("header", "true")\
    .load("../data/retail-data/by-day/*.csv")


# COMMAND ----------

In [5]:
purchaseByCustomerPerHour = streamingDataFrame\
  .selectExpr(
    "CustomerId",
    "(UnitPrice * Quantity) as total_cost",
    "InvoiceDate")\
  .groupBy(
    col("CustomerId"), window(col("InvoiceDate"), "1 day"))\
  .sum("total_cost")


# COMMAND ----------

In [6]:
purchaseByCustomerPerHour.writeStream\
    .format("memory")\
    .queryName("customer_purchases")\
    .outputMode("complete")\
    .start()


# COMMAND ----------

<pyspark.sql.streaming.StreamingQuery at 0x7f5bca01e3d0>

In [7]:
spark.sql("""
  SELECT *
  FROM customer_purchases
  ORDER BY `sum(total_cost)` DESC
  """)\
  .show(5)


# COMMAND ----------

+----------+------+---------------+
|CustomerId|window|sum(total_cost)|
+----------+------+---------------+
+----------+------+---------------+



In [8]:
from pyspark.sql.functions import date_format, col
preppedDataFrame = staticDataFrame\
  .na.fill(0)\
  .withColumn("day_of_week", date_format(col("InvoiceDate"), "EEEE"))\
  .coalesce(5)


# COMMAND ----------

In [9]:
trainDataFrame = preppedDataFrame\
  .where("InvoiceDate < '2011-07-01'")
testDataFrame = preppedDataFrame\
  .where("InvoiceDate >= '2011-07-01'")


# COMMAND ----------

In [10]:
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer()\
  .setInputCol("day_of_week")\
  .setOutputCol("day_of_week_index")


# COMMAND ----------

In [11]:
from pyspark.ml.feature import OneHotEncoder
encoder = OneHotEncoder()\
  .setInputCol("day_of_week_index")\
  .setOutputCol("day_of_week_encoded")


# COMMAND ----------

In [12]:
## 독립변수 모아주기(VectorAssembler)
from pyspark.ml.feature import VectorAssembler

vectorAssembler = VectorAssembler()\
  .setInputCols(["UnitPrice", "Quantity", "day_of_week_encoded"])\
  .setOutputCol("features")


# COMMAND ----------

In [13]:
from pyspark.ml import Pipeline

transformationPipeline = Pipeline()\
  .setStages([indexer, encoder, vectorAssembler])


# COMMAND ----------

In [14]:
fittedPipeline = transformationPipeline.fit(trainDataFrame)


# COMMAND ----------

In [15]:
transformedTraining = fittedPipeline.transform(trainDataFrame)


# COMMAND ----------

In [16]:
## KMeans clustering
from pyspark.ml.clustering import KMeans
kmeans = KMeans()\
  .setK(20)\
  .setSeed(1)


# COMMAND ----------

In [17]:
kmModel = kmeans.fit(transformedTraining)


# COMMAND ----------

In [18]:
transformedTest = fittedPipeline.transform(testDataFrame)


# COMMAND ----------

In [19]:
from pyspark.sql import Row

spark.sparkContext.parallelize([Row(1), Row(2), Row(3)]).toDF()


# COMMAND ----------

DataFrame[_1: bigint]