In [75]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, count, lit,to_date,date_diff
from pyspark.ml.regression import IsotonicRegression

In [76]:
spark=SparkSession.builder.appName("ARIMA").getOrCreate()
data_path="file:///home/siwenyu/桌面/Spark-lab4/res/task1_1/part-00000-3a410553-5280-4da6-8b49-741df2889028-c000.csv"
df=spark.read.csv(data_path,header=True)
df.show()

+-----------+------------------+----------------+
|report_date|total_purchase_amt|total_redeem_amt|
+-----------+------------------+----------------+
|   20140824|         130195484|       191080151|
|   20140812|         258493673|       309754858|
|   20140728|         371762756|       345986909|
|   20140716|         394890140|       234775948|
|   20140704|         211649838|       264494550|
|   20140320|         365011495|       336076380|
|   20140404|         251895894|       200192637|
|   20140416|         387847838|       255914640|
|   20140428|         324937272|       327724735|
|   20140512|         325108597|       293952908|
|   20140524|         160073254|       154409868|
|   20140608|         302171269|       169525332|
|   20140620|         251582530|       286583065|
|   20131020|          47766681|        50884342|
|   20131104|         300027403|       130970051|
|   20131116|         118085705|        28996272|
|   20131128|         139760425|        61453591|


In [77]:
from pyspark.sql import functions as F

# preprocess
# date conversion to yyMMdd, but first select a base
base_date = F.lit("2013-07-01").cast("date")
df = df.withColumn("report_date", F.to_date(col("report_date"), "yyyyMMdd"))
# sequence based on base
df = df.withColumn("sequence", F.datediff(col("report_date"), base_date))
df.show()



+-----------+------------------+----------------+--------+
|report_date|total_purchase_amt|total_redeem_amt|sequence|
+-----------+------------------+----------------+--------+
| 2014-08-24|         130195484|       191080151|     419|
| 2014-08-12|         258493673|       309754858|     407|
| 2014-07-28|         371762756|       345986909|     392|
| 2014-07-16|         394890140|       234775948|     380|
| 2014-07-04|         211649838|       264494550|     368|
| 2014-03-20|         365011495|       336076380|     262|
| 2014-04-04|         251895894|       200192637|     277|
| 2014-04-16|         387847838|       255914640|     289|
| 2014-04-28|         324937272|       327724735|     301|
| 2014-05-12|         325108597|       293952908|     315|
| 2014-05-24|         160073254|       154409868|     327|
| 2014-06-08|         302171269|       169525332|     342|
| 2014-06-20|         251582530|       286583065|     354|
| 2013-10-20|          47766681|        50884342|     11

In [78]:
# only select columns: sequence, total_purchase_amt; sequence, total_redeem_amt, 2 dataframes
purchase = df.select("sequence", "total_purchase_amt").orderBy("sequence")
redeem = df.select("sequence", "total_redeem_amt").orderBy("sequence")
purchase.show()
redeem.show()

+--------+------------------+
|sequence|total_purchase_amt|
+--------+------------------+
|       0|          32488348|
|       1|          29037390|
|       2|          27270770|
|       3|          18321185|
|       4|          11648749|
|       5|          36751272|
|       6|           8962232|
|       7|          57258266|
|       8|          26798941|
|       9|          30696506|
|      10|          44075197|
|      11|          34183904|
|      12|          15164717|
|      13|          22615303|
|      14|          48128555|
|      15|          50622847|
|      16|          29015682|
|      17|          24234505|
|      18|          33680124|
|      19|          20439079|
+--------+------------------+
only showing top 20 rows

+--------+----------------+
|sequence|total_redeem_amt|
+--------+----------------+
|       0|         5525022|
|       1|         2554548|
|       2|         5953867|
|       3|         6410729|
|       4|         2763587|
|       5|         1616635|
| 

In [79]:
# preprocess for isotonic regression

# check the null values
purchase.where(col("total_purchase_amt").isNull()).show()
redeem.where(col("total_redeem_amt").isNull()).show()

# transfer amt to double
purchase = purchase.withColumn("total_purchase_amt", col("total_purchase_amt").cast("double"))
redeem = redeem.withColumn("total_redeem_amt", col("total_redeem_amt").cast("double"))
purchase.show()
redeem.show()


+--------+------------------+
|sequence|total_purchase_amt|
+--------+------------------+
+--------+------------------+

+--------+----------------+
|sequence|total_redeem_amt|
+--------+----------------+
+--------+----------------+

+--------+------------------+
|sequence|total_purchase_amt|
+--------+------------------+
|       0|       3.2488348E7|
|       1|        2.903739E7|
|       2|        2.727077E7|
|       3|       1.8321185E7|
|       4|       1.1648749E7|
|       5|       3.6751272E7|
|       6|         8962232.0|
|       7|       5.7258266E7|
|       8|       2.6798941E7|
|       9|       3.0696506E7|
|      10|       4.4075197E7|
|      11|       3.4183904E7|
|      12|       1.5164717E7|
|      13|       2.2615303E7|
|      14|       4.8128555E7|
|      15|       5.0622847E7|
|      16|       2.9015682E7|
|      17|       2.4234505E7|
|      18|       3.3680124E7|
|      19|       2.0439079E7|
+--------+------------------+
only showing top 20 rows

+--------+----------

In [97]:
# train isotonic regression model


# predict

from pyspark.sql.functions import date_diff, lit, to_date
from pyspark.sql.types import IntegerType
from pyspark.ml.feature import VectorAssembler

max_date = purchase.selectExpr("max(sequence) as sequence").collect()[0]["sequence"]
print(max_date)
future_day_diff =[(max_date+i,) for i in range(1,31)]
future=spark.createDataFrame(future_day_diff,["sequence"])
future.show()

assembler = VectorAssembler(inputCols=["sequence"], outputCol="features")
future_df = assembler.transform(future).select("sequence")
future_df.show()

assembler = VectorAssembler(inputCols=["sequence"], outputCol="features")
purchase = assembler.transform(purchase).select("features", col("total_purchase_amt").alias("label"))

# add a featurecol in the dataframe with tuple
model_pur = IsotonicRegression(featuresCol="sequence", labelCol="total_purchase_amt")
model_red = IsotonicRegression(featuresCol="sequence", labelCol="total_redeem_amt")

purchase_pred=model_pur.fit(purchase).transform(future_df)

purchase_pred.show()
redeem_pred.show()





426
+--------+
|sequence|
+--------+
|     427|
|     428|
|     429|
|     430|
|     431|
|     432|
|     433|
|     434|
|     435|
|     436|
|     437|
|     438|
|     439|
|     440|
|     441|
|     442|
|     443|
|     444|
|     445|
|     446|
+--------+
only showing top 20 rows

+--------+
|sequence|
+--------+
|     427|
|     428|
|     429|
|     430|
|     431|
|     432|
|     433|
|     434|
|     435|
|     436|
|     437|
|     438|
|     439|
|     440|
|     441|
|     442|
|     443|
|     444|
|     445|
|     446|
+--------+
only showing top 20 rows



IllegalArgumentException: Output column features already exists.