# PySpark Installtion

In [1]:
# !apt-get update -qq > /dev/null
# !apt-get install openjdk-8-jdk-headless -qq > /dev/null
# !wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz
# !tar xf spark-3.0.0-bin-hadoop3.2.tgz
# !pip install -q findspark

# Mount Google Drive

In [2]:
# from google.colab import drive
# drive.mount('/content/drive')

### Create spark sessionn

In [3]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "spark-3.0.0-bin-hadoop3.2"

import findspark
findspark.init()

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

spark_conf = SparkConf()\
  .setAppName("YourTest")\
  .setMaster("local[*]")

sc = SparkContext.getOrCreate(spark_conf)

In [4]:
def create_spark_session():
    """Create Spark Session
    
    Return (obj): Spark configured session instance
    """
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark

In [5]:
from pyspark.sql.functions import *

In [6]:
spark = create_spark_session()

In [7]:
main_data_link = "gs://cs-651-data-bucket"

In [8]:
df_blocks = spark.read.parquet(main_data_link + "/dataset-blk01000-01999/blocks")

In [9]:
# df_blocks.show(10)

In [10]:
df_blocks.select('file').distinct().collect()

[Row(file='blk01397.dat'),
 Row(file='blk01735.dat'),
 Row(file='blk01931.dat'),
 Row(file='blk01010.dat'),
 Row(file='blk01894.dat'),
 Row(file='blk01608.dat'),
 Row(file='blk01306.dat'),
 Row(file='blk01347.dat'),
 Row(file='blk01394.dat'),
 Row(file='blk01823.dat'),
 Row(file='blk01100.dat'),
 Row(file='blk01103.dat'),
 Row(file='blk01054.dat'),
 Row(file='blk01363.dat'),
 Row(file='blk01557.dat'),
 Row(file='blk01797.dat'),
 Row(file='blk01804.dat'),
 Row(file='blk01358.dat'),
 Row(file='blk01091.dat'),
 Row(file='blk01598.dat'),
 Row(file='blk01303.dat'),
 Row(file='blk01870.dat'),
 Row(file='blk01793.dat'),
 Row(file='blk01224.dat'),
 Row(file='blk01142.dat'),
 Row(file='blk01843.dat'),
 Row(file='blk01893.dat'),
 Row(file='blk01541.dat'),
 Row(file='blk01139.dat'),
 Row(file='blk01119.dat'),
 Row(file='blk01302.dat'),
 Row(file='blk01430.dat'),
 Row(file='blk01036.dat'),
 Row(file='blk01391.dat'),
 Row(file='blk01461.dat'),
 Row(file='blk01140.dat'),
 Row(file='blk01897.dat'),
 

In [11]:
# df_blocks.where(df_blocks.time > 1315366304).show(10)
# df_blocks.where(df_blocks.time > 1315366304)

In [12]:
df_prices = spark.read.parquet(main_data_link + "/price-history")

In [13]:
# df_prices.show(10)

In [14]:
# df_inputs = spark.read.parquet(main_data_link + "/dataset-blk01500-01599/inputs").limit(10000000)
df_inputs = spark.read.parquet(main_data_link + "/dataset-blk01000-01999/inputs")

In [15]:
# df_inputs.show(10)

In [16]:
df_inputs.where(df_inputs.transaction == df_inputs.parent_transaction).count()

100000000

In [17]:
df_inputs.count()

100000000

In [18]:
df_outputs = spark.read.parquet(main_data_link + "/dataset-blk01000-01999/outputs")

In [19]:
# df_outputs.show(10)

In [20]:
df_transactions = spark.read.parquet(main_data_link + "/dataset-blk01000-01999/transactions")

In [21]:
# df_transactions.show(10)

## Oberve Transaction Relationships

### Transaction vs blocks

In [22]:
# df_transactions.join(df_blocks, df_transactions.hash == df_blocks.hash, "inner").show()
df_transactions.join(df_blocks, df_transactions.hash == df_blocks.hash, "inner")

DataFrame[hash: string, block: string, is_coinbase: boolean, lock_time: bigint, _partition: int, hash: string, time: bigint, file: string, _partition: int]

In [23]:
# df_transactions.join(df_blocks, df_transactions.block == df_blocks.hash, "inner").show()
df_transactions.join(df_blocks, df_transactions.block == df_blocks.hash, "inner")

DataFrame[hash: string, block: string, is_coinbase: boolean, lock_time: bigint, _partition: int, hash: string, time: bigint, file: string, _partition: int]

### Transaction vs Outputs

In [24]:
# df_transactions.join(df_outputs, df_transactions.hash == df_outputs.transaction, "inner").show()
df_transactions.join(df_outputs, df_transactions.hash == df_outputs.transaction, "inner")

DataFrame[hash: string, block: string, is_coinbase: boolean, lock_time: bigint, _partition: int, transaction: string, index: int, value: bigint, address: string, _partition: int]

### Transaction vs Inputs

In [25]:
# df_transactions.join(df_inputs, df_transactions.hash == df_inputs.transaction, "inner").show()
df_transactions.join(df_inputs, df_transactions.hash == df_inputs.transaction, "inner")

DataFrame[hash: string, block: string, is_coinbase: boolean, lock_time: bigint, _partition: int, transaction: string, index: int, parent_transaction: string, vout: int, _partition: int]

In [26]:
# df_transactions.where(df_transactions.is_coinbase == True).show(10)
df_transactions.where(df_transactions.is_coinbase == True)

DataFrame[hash: string, block: string, is_coinbase: boolean, lock_time: bigint, _partition: int]

In [27]:
# df_transactions.where(df_transactions.lock_time > 0).show(10)
df_transactions.where(df_transactions.lock_time > 0)

DataFrame[hash: string, block: string, is_coinbase: boolean, lock_time: bigint, _partition: int]

In [28]:
# df_transactions.join(df_inputs, df_transactions.hash == df_inputs.transaction, "inner").where(df_transactions.is_coinbase == True).show()
df_transactions.join(df_inputs, df_transactions.hash == df_inputs.transaction, "inner").where(df_transactions.is_coinbase == True)

DataFrame[hash: string, block: string, is_coinbase: boolean, lock_time: bigint, _partition: int, transaction: string, index: int, parent_transaction: string, vout: int, _partition: int]

## Blocks vs Inputs

In [29]:
# df_blocks.join(df_inputs, df_inputs.transaction == df_blocks.hash, "inner").show()
df_blocks.join(df_inputs, df_inputs.transaction == df_blocks.hash, "inner")

DataFrame[hash: string, time: bigint, file: string, _partition: int, transaction: string, index: int, parent_transaction: string, vout: int, _partition: int]

## Blocks vs Outputs

In [30]:
# df_blocks.join(df_outputs, df_outputs.transaction == df_blocks.hash, "inner").show()
df_blocks.join(df_outputs, df_outputs.transaction == df_blocks.hash, "inner")

DataFrame[hash: string, time: bigint, file: string, _partition: int, transaction: string, index: int, value: bigint, address: string, _partition: int]

In [31]:
# df_blocks.join(df_outputs, df_outputs.address == df_blocks.hash, "inner").show()
df_blocks.join(df_outputs, df_outputs.address == df_blocks.hash, "inner")

DataFrame[hash: string, time: bigint, file: string, _partition: int, transaction: string, index: int, value: bigint, address: string, _partition: int]

## Blocks vs Prices

In [32]:
import numpy as np

In [33]:
df_blocks.join(df_prices, df_prices.timestamp == df_blocks.time, "inner").where(df_prices.open != np.NaN).count()

2235

In [34]:
df_blocks.join(df_prices, df_prices.timestamp == df_blocks.time, "inner").count()

2290

In [35]:
# df_prices.where(df_prices.open != np.NaN).show()
df_prices.where(df_prices.open != np.NaN)

DataFrame[timestamp: bigint, open: float, high: float, low: float, close: float, volume_btc: float, volume_usd: float, weighted_price: float]

In [36]:
df_prices.where(df_prices.open != np.NaN).count()

3613769

In [37]:
# import numpy as np

In [38]:
# np.NaN

In [39]:
from datetime import datetime

In [40]:
# a = datetime.fromtimestamp(1538167740)

In [41]:
# a.year, a.month, a.day

# Feature Engineering

In [42]:
get_year = udf(lambda x: datetime.fromtimestamp(x).year)
get_month = udf(lambda x: datetime.fromtimestamp(x).month)
get_day = udf(lambda x: datetime.fromtimestamp(x).day)

In [43]:
df = df_blocks.withColumn("year", get_year(df_blocks.time))
df = df.withColumn("month", get_month(df.time))
df = df.withColumn("day", get_day(df.time))
df = df.selectExpr(["hash as block_id", "time", "file", "_partition", "year", "month", "day"])
# df.show()

In [44]:
df_tr_inp = df_transactions.join(df_inputs, df_transactions.hash == df_inputs.transaction, "inner").select("hash", "block", "is_coinbase", "lock_time", "vout")
# df_tr_inp.show()

In [45]:
df_tr_inp_out = df_tr_inp.join(df_outputs, df_tr_inp.hash == df_outputs.transaction).select("hash", "block", "is_coinbase", "lock_time", "vout", "value")
# df_tr_inp_out.show()

In [46]:
df_tr_inp_out_bl = df_tr_inp_out.join(df, df_tr_inp_out.block == df.block_id).select("hash", "block", "is_coinbase", "lock_time", "vout", "value", "time", "year", "month", "day", "file")
# df_tr_inp_out_bl.show()


In [47]:
numerize_coinbase = udf(lambda x: int(x))

In [48]:
df_tr_inp_out_bl = df_tr_inp_out_bl.withColumn("is_coinbase", numerize_coinbase(df_tr_inp_out_bl.is_coinbase))


In [49]:
df_tr_inp_out_bl = df_tr_inp_out_bl.withColumn("network_fee", df_tr_inp_out_bl.value - df_tr_inp_out_bl.vout)
# df_tr_inp_out_bl.show(3)

In [50]:
# .dropDuplicates(['name', 'height']).show()
df_tr_inp_out_bl_dd = df_tr_inp_out_bl.dropDuplicates(['hash', 'block']).sort(df_tr_inp_out_bl.time.desc())

In [51]:
# df_tr_inp_out_bl_dd.show(3)

In [52]:
4+5

9

In [None]:
network_max_value = df_tr_inp_out_bl_dd.agg({"network_fee": "max"}).collect()[0][0]
network_max_value

5456513071805

In [None]:
network_min_value = df_tr_inp_out_bl_dd.agg({"network_fee": "min"}).collect()[0][0]
network_min_value

-7386

In [None]:
network_std_value = df_tr_inp_out_bl_dd.agg({"network_fee": "std"}).collect()[0][0]
network_std_value

5044428282.184236

In [None]:
network_mean_value = df_tr_inp_out_bl_dd.agg({"network_fee": "mean"}).collect()[0][0]
network_mean_value

118450519.78025664

In [None]:
normalize_network_min_max = udf(lambda x: (x - network_min_value)/(network_max_value - network_min_value))
normalize_network_std = udf(lambda x: (x - network_mean_value)/network_std_value)

In [None]:
df_tr_inp_out_bl_dd = df_tr_inp_out_bl_dd.withColumn("network_normalized", normalize_network_std(df_tr_inp_out_bl_dd.network_fee))
# df_tr_inp_out_bl_dd.show()

In [None]:
df_tr_inp_out_bl_dd.write.parquet(main_data_link + "/all_combo.parquet", mode="overwrite")

In [None]:
formed_df = spark.read.parquet(main_data_link + "/all_combo.parquet")
# formed_df.show()

In [None]:
# formed_df.where((formed_df.year == 2012) & (formed_df.month == 5)).show()
formed_df.where((formed_df.year == 2012) & (formed_df.month == 5))

DataFrame[hash: string, block: string, is_coinbase: string, lock_time: bigint, vout: int, value: bigint, time: bigint, year: string, month: string, day: string, file: string, network_fee: bigint, network_normalized: string]

In [None]:
formed_df.count()

45580499

In [None]:
df_prices = df_prices.withColumn("year", get_year(df_prices.timestamp))
df_prices = df_prices.withColumn("month", get_month(df_prices.timestamp))
df_prices = df_prices.withColumn("day", get_day(df_prices.timestamp))
# df_prices = df_prices.selectExpr(["hash as block_id", "time", "file", "_partition", "year", "month", "day"])
# df_prices.show()

In [None]:
df_prices = df_prices.where((df_prices.open != np.NaN))
# df_prices.show()

In [None]:
df_day_prices = df_prices.groupBy("year", "month", "day").agg(avg("weighted_price"), avg("volume_btc"), count("*")).withColumnRenamed("avg(weighted_price)","day_w_price").withColumnRenamed("avg(volume_btc)","day_volume_btc").withColumnRenamed("count(1)","day_samples")

# df_day_prices.show()

In [None]:
df_day_formed = formed_df.groupBy("year", "month", "day").agg(avg("network_fee"), count("*")).withColumnRenamed("avg(network_fee)","day_network_fee").withColumnRenamed("count(1)","day_samples_tr")
# df_day_formed.show()


In [None]:
day_network_std_value = df_day_formed.agg({"day_network_fee": "std"}).collect()[0][0]
day_network_mean_value = df_day_formed.agg({"day_network_fee": "mean"}).collect()[0][0]
normalize_day_network_std = udf(lambda x: (x - day_network_mean_value)/day_network_std_value)
df_day_network_norm = df_day_formed.withColumn("network_normalized", normalize_day_network_std(df_day_formed.day_network_fee))
# df_day_network_norm.show()

In [None]:
df_day_network_norm = df_day_network_norm.selectExpr(["year as year_n", "month as month_n", "day as day_n", "day_network_fee", "day_samples_tr", "network_normalized"])
# df_day_network_norm.show(4)

In [None]:
ddf = df_day_network_norm.join(df_day_prices, (df_day_network_norm.year_n == df_day_prices.year) & (df_day_network_norm.month_n == df_day_prices.month) & (df_day_network_norm.day_n == df_day_prices.day))
# ddf.show()

In [None]:
ddf.count()

391

In [None]:
# ddf.show()

In [None]:
ddf = ddf.select(["year", "month", "day", "network_normalized", "day_samples_tr", "day_w_price", "day_volume_btc", "day_samples"])

In [None]:
from pyspark.sql.types import IntegerType
ddf = ddf.withColumn("year", ddf["year"].cast(IntegerType()))
ddf = ddf.withColumn("month", ddf["month"].cast(IntegerType()))
ddf = ddf.withColumn("day", ddf["day"].cast(IntegerType()))

In [None]:
ddf = ddf.sort(ddf.year.desc(), ddf.month.desc(), ddf.day.desc())


In [None]:
# ddf.show()

In [None]:
from pyspark.sql.functions import monotonically_increasing_id

ddf = ddf.withColumn("id", monotonically_increasing_id())


In [None]:
# ddf.show()

In [None]:
from pyspark.sql import Window
from pyspark.sql import SparkSession, functions as F

window = Window.orderBy(F.col('id'))
ddf = ddf.withColumn('row_number', F.row_number().over(window))

In [None]:
# ddf.show()

In [None]:
# Assign Window
assign_7_day_window = udf(lambda x: int(x/7))
ddf = ddf.withColumn('week_id', assign_7_day_window(ddf.row_number))

In [None]:
# ddf.show()

In [None]:
unq_weeks = [x[0] for x in ddf.select("week_id").distinct().collect()]
unq_weeks

['7',
 '51',
 '15',
 '54',
 '11',
 '29',
 '42',
 '3',
 '30',
 '34',
 '8',
 '22',
 '28',
 '16',
 '35',
 '52',
 '0',
 '47',
 '43',
 '5',
 '31',
 '18',
 '27',
 '17',
 '26',
 '46',
 '6',
 '19',
 '23',
 '41',
 '55',
 '38',
 '40',
 '25',
 '44',
 '53',
 '33',
 '48',
 '9',
 '24',
 '32',
 '1',
 '20',
 '36',
 '10',
 '37',
 '49',
 '4',
 '39',
 '12',
 '13',
 '14',
 '21',
 '2',
 '50',
 '45']

In [None]:
ddf.write.parquet(main_data_link + "/weekly_finalized.parquet", mode="overwrite")

In [None]:
ddf = spark.read.parquet(main_data_link + "/weekly_finalized.parquet")
# ddf.show()

In [None]:
ddf.count()

391

In [None]:
import pandas as pd

In [None]:
main_df = list()

for uq_week in unq_weeks:
  pdf = ddf.where(ddf.week_id == uq_week).toPandas()
  pdf = pdf.sort_values(by=['row_number'])
  day_w_price_list = pdf["day_w_price"].tolist()
  day_volume_btc_list = pdf["day_volume_btc"].tolist()
  day_samples_list = pdf["day_samples"].tolist()
  day_samples_tr_list = pdf["day_samples_tr"].tolist()
  network_normalized_list = pdf["network_normalized"].tolist()
  week_label_id = pdf["week_id"].tolist()

  for i in range(7 - len(day_w_price_list)):
    day_w_price_list.append(0.0)
    day_volume_btc_list.append(0.0)
    day_samples_list.append(0.0)
    day_samples_tr_list.append(0.0)
    network_normalized_list.append(0.0)


  new_df = pd.DataFrame()
  for i in range(7):
    new_df["week_id"] = [week_label_id[0],]
    new_df["day_w_price_{0}".format(i)] = [day_w_price_list[i],]
    new_df["day_volume_btc_{0}".format(i)] = [day_volume_btc_list[i],]
    new_df["day_samples_list_{0}".format(i)] = [day_samples_list[i],]
    new_df["day_samples_tr_list_{0}".format(i)] = [day_samples_tr_list[i],]
    new_df["network_normalized_{0}".format(i)] = [network_normalized_list[i],]

  main_df.append(new_df)
  




In [None]:
pdf_final = pd.concat(main_df)

In [None]:
pdf_final["network_normalized_6"] = pdf_final["network_normalized_6"].apply(lambda x: float(x))
pdf_final["network_normalized_4"] = pdf_final["network_normalized_4"].apply(lambda x: float(x))
pdf_final["network_normalized_5"] = pdf_final["network_normalized_5"].apply(lambda x: float(x))

In [None]:
pdf_final

Unnamed: 0,week_id,day_w_price_0,day_volume_btc_0,day_samples_list_0,day_samples_tr_list_0,network_normalized_0,day_w_price_1,day_volume_btc_1,day_samples_list_1,day_samples_tr_list_1,...,day_w_price_5,day_volume_btc_5,day_samples_list_5,day_samples_tr_list_5,network_normalized_5,day_w_price_6,day_volume_btc_6,day_samples_list_6,day_samples_tr_list_6,network_normalized_6
0,7,7430.027208,1.918692,1411,21224,-0.8263104408081492,7150.83926,2.243633,1427,36141,...,7405.110024,2.908053,1433,22465,-0.562029,7328.986917,3.227925,1438.0,254832.0,-0.861857
0,51,5890.569028,11.811032,1362,24572,0.908567794145038,5748.121116,4.409447,1344,55529,...,5658.646806,12.08017,1431,176779,1.598936,5866.644507,10.896684,1407.0,20005.0,2.793125
0,15,10316.605587,2.684358,1399,10930,-0.6745704100402417,10220.789596,3.337336,1396,26776,...,10659.807005,7.515105,1424,4104,-0.79896,10564.875488,3.235529,1408.0,180917.0,-0.566583
0,54,4395.624039,5.976714,1382,189502,1.7631711833272004,4290.27228,5.397706,1336,185892,...,4037.267978,10.654303,1415,118665,2.83739,3914.694761,7.138007,1403.0,139437.0,1.717022
0,11,7880.584322,13.282693,1439,131870,-0.3860067055185775,7450.100642,3.4801,1436,222440,...,8064.5229,6.019777,1436,4959,-0.517398,8264.189837,3.515443,1435.0,286759.0,-0.699546
0,29,4980.67054,9.158753,1422,220978,-0.3106972448623404,4971.545803,11.063406,1437,43205,...,4088.429469,1.962899,1224,8845,-0.585784,3994.073714,5.822625,1400.0,28975.0,-0.821035
0,42,6499.971607,3.351327,1372,1390,-1.0215882952170845,6441.487447,5.080253,1375,33585,...,6705.525896,4.982766,1438,12304,2.490874,6738.87212,3.63664,1415.0,63196.0,0.270415
0,3,9922.395657,5.250464,1440,146275,-0.4325285795440584,10080.722378,3.356732,1439,137027,...,9403.167146,5.611669,1437,8555,-0.679669,9041.676245,6.647366,1439.0,141929.0,-0.845806
0,30,3908.236848,3.201487,1348,272306,-0.4835306433672619,3995.972001,4.595257,1359,25049,...,3793.163625,6.425718,1375,107625,-0.070846,3985.170884,9.889859,1389.0,196987.0,-0.259521
0,34,3575.310058,5.002983,1181,254952,-0.5498295518975886,3626.334447,2.781094,1145,50370,...,4006.822504,8.697562,1359,11582,-0.685845,4022.403776,5.502658,1353.0,7764.0,0.167691


In [None]:
sparkDF = spark.createDataFrame(pdf_final) 

In [None]:
# sparkDF.show()

In [None]:
weekly_price = ddf.selectExpr(["row_number", "week_id", "day_w_price as week_day_end_price"]).dropDuplicates(['week_id'])
weekly_price.show()

+----------+-------+------------------+
|row_number|week_id|week_day_end_price|
+----------+-------+------------------+
|        49|      7| 7430.027208443147|
|       357|     51|  5890.56902798332|
|       105|     15|10316.605586719308|
|       378|     54| 4395.624038630042|
|        77|     11| 7880.584321658921|
|       203|     29| 4980.670540295908|
|       294|     42| 6499.971607441805|
|        21|      3| 9922.395657009549|
|       210|     30|3908.2368477388377|
|       238|     34|3575.3100575601316|
|        56|      8| 7319.050657474795|
|       154|     22| 7896.830021300631|
|       196|     28| 5047.457163507227|
|       112|     16|10563.647525311186|
|       245|     35|3700.6787021324285|
|       364|     52| 6072.970271406686|
|         1|      0| 5294.226220364041|
|       329|     47|16200.092354329427|
|       301|     43|   6573.8308199165|
|        35|      5| 8847.166835312717|
+----------+-------+------------------+
only showing top 20 rows



In [None]:
weekly_price.write.parquet(main_data_link + "/weekly_labels.parquet", mode = "Overwrite") 

In [None]:
w_price = spark.read.parquet(main_data_link + "/weekly_labels.parquet")
w_price.count()

56

In [None]:
# sparkDF.show()

In [None]:
decrease_val = udf(lambda x: int(x) - 1)


sparkDF = sparkDF.withColumn("previous_week_id", decrease_val(sparkDF.week_id))
# sparkDF.show()

In [None]:
final_col_list  = list()
for i in range(7):
  final_col_list.append("day_w_price_{0}".format(i))
  final_col_list.append("day_volume_btc_{0}".format(i))
  final_col_list.append("day_samples_list_{0}".format(i))
  final_col_list.append("day_samples_tr_list_{0}".format(i))
  final_col_list.append("network_normalized_{0}".format(i))


final_col_list.append("week_day_end_price")

train_df = sparkDF.join(w_price, sparkDF.previous_week_id == w_price.week_id).select(final_col_list)
# train_df.show()

In [None]:
from pyspark.sql.types import IntegerType, FloatType

for col in final_col_list:
  train_df = train_df.withColumn(col, train_df[col].cast(FloatType()))


In [None]:
from pyspark.ml.feature import VectorAssembler


In [None]:
assembler = VectorAssembler(
    inputCols=final_col_list[:-1],
    outputCol="features")

In [None]:
output = assembler.transform(train_df).select('features','week_day_end_price')
output.show()

+--------------------+------------------+
|            features|week_day_end_price|
+--------------------+------------------+
|[7430.02734375,1....|         8111.7153|
|[5890.56884765625...|         6960.0317|
|[10316.60546875,2...|         10174.287|
|[4395.6240234375,...|         5617.7075|
|[7880.58447265625...|          9335.805|
|[4980.67041015625...|          5047.457|
|[6499.9716796875,...|         6658.8315|
|[9922.3955078125,...|          9846.396|
|[3908.23681640625...|         4980.6704|
|[3575.31005859375...|          3413.938|
|[7319.05078125,1....|         7430.0273|
|[7896.830078125,3...|         10724.805|
|[5047.45703125,5....|          5317.855|
|[10563.6474609375...|         10316.605|
|[3700.6787109375,...|           3575.31|
|[6072.97021484375...|          5890.569|
|[16200.0927734375...|         18318.875|
|[6573.8310546875,...|         6499.9717|
|[8847.1669921875,...|          8723.535|
|[3906.53442382812...|         3908.2368|
+--------------------+------------

In [None]:
train,test = output.randomSplit([0.75, 0.25])
# train,test = output.randomSplit([0.5, 0.5])

#Simple Linear Regressor

In [None]:
from pyspark.ml.regression import LinearRegression
lin_reg = LinearRegression(featuresCol = 'features', labelCol='week_day_end_price')
linear_model = lin_reg.fit(train)
print("Coefficients: " + str(linear_model.coefficients))
print("\nIntercept: " + str(linear_model.intercept))


Coefficients: [1.667550775867644,-9.616218238964247,7.87870632812789,0.0017980433676442573,-642.4923895794335,-0.922841499700496,-96.68455035125466,-6.51364434340936,0.0048556084124388305,476.10915093063335,1.5654816358245776,36.18545770208855,3.2258433604933665,0.0017278215949789045,-826.579690864425,-2.231855467231347,28.494113158206925,3.129104454281954,0.001682640578394212,921.7580013370822,1.6768565451481199,9.341080488081635,5.4489751583905175,-0.003290142697978702,-417.8948045851574,-1.269476657873392,-18.860230756653525,3.3199059859174893,8.76233625648121e-06,187.97057939596237,0.36631939594668994,-108.24940378326987,-6.527365455172211,0.0038284387939307263,284.81225238464947]

Intercept: -13363.58030417038


In [None]:
trainSummary = linear_model.summary
print("RMSE: %f" % trainSummary.rootMeanSquaredError)
print("\nr2: %f" % trainSummary.r2)

RMSE: 585.115035

r2: 0.955209


In [None]:
from  pyspark.sql.functions import abs
predictions = linear_model.transform(test)
x =((predictions['week_day_end_price']-predictions['prediction'])/predictions['week_day_end_price'])*100
predictions = predictions.withColumn('Accuracy',abs(x))
predictions.select("prediction","week_day_end_price","Accuracy","features").show()

+------------------+------------------+------------------+--------------------+
|        prediction|week_day_end_price|          Accuracy|            features|
+------------------+------------------+------------------+--------------------+
| 3447.328830945524|         4980.6704| 30.78584714387121|[3908.23681640625...|
| 8712.423141158135|         10724.805| 18.76380600839604|[7896.830078125,3...|
|24254.865809400526|         18318.875| 32.40368641306044|[16200.0927734375...|
|3001.8487799472096|         3908.2368|23.191737835694756|[3906.53442382812...|
| 4933.408397469797|         3508.6577| 40.60671625500794|[6365.11328125,2....|
| 6275.856839256947|         6665.8735| 5.850946523997639|[6658.83154296875...|
| 1582.585368284781|         3700.6787|57.235267044194124|[3773.259765625,8...|
|  9348.30243727726|          8358.547|11.841239596772134|[8257.533203125,3...|
| 9716.368963993118|         7880.5845| 23.29502967332693|[8358.546875,3.10...|
|11932.124225292166|          8257.533|4

#Random Forest Regressor

In [None]:
from pyspark.ml.regression import RandomForestRegressor
rf = RandomForestRegressor(featuresCol = 'features', labelCol='week_day_end_price')
rf_model = rf.fit(train)

In [None]:
from  pyspark.sql.functions import abs

predictions_rf = rf_model.transform(test)
x_rf =((predictions_rf['week_day_end_price']-predictions_rf['prediction'])/predictions_rf['week_day_end_price'])*100
predictions_rf = predictions_rf.withColumn('Accuracy',abs(x_rf))
predictions_rf.select("prediction","week_day_end_price","Accuracy","features").show()

+------------------+------------------+------------------+--------------------+
|        prediction|week_day_end_price|          Accuracy|            features|
+------------------+------------------+------------------+--------------------+
| 4080.753646850586|         4980.6704|18.068185388670045|[3908.23681640625...|
| 8720.291426440112|         10724.805| 18.69044070701067|[7896.830078125,3...|
|11852.519312608509|         18318.875| 35.29886899381917|[16200.0927734375...|
| 4259.150422973633|         3908.2368| 8.978821475052253|[3906.53442382812...|
| 6992.888114832758|         3508.6577| 99.30379886440905|[6365.11328125,2....|
|6408.9331472685835|         6665.8735|3.8545643947870625|[6658.83154296875...|
| 4765.478233642578|         3700.6787| 28.77308747603572|[3773.259765625,8...|
| 8711.745122365428|          8358.547| 4.225593905823827|[8257.533203125,3...|
| 8964.870415101663|         7880.5845|13.758953364533133|[8358.546875,3.10...|
| 9471.022994506307|          8257.533|1

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator_rf = RegressionEvaluator(
    labelCol="week_day_end_price", predictionCol="prediction", metricName="rmse")
rmse_rf = evaluator_rf.evaluate(predictions_rf)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse_rf)

Root Mean Squared Error (RMSE) on test data = 2407.71


#Decision Tree Regressor

In [None]:
from pyspark.ml.regression import DecisionTreeRegressor

dt = DecisionTreeRegressor(featuresCol = 'features', labelCol='week_day_end_price')
dt_model = dt.fit(train)

In [None]:
from  pyspark.sql.functions import abs

predictions_dt = dt_model.transform(test)
x_dt =((predictions_dt['week_day_end_price']-predictions_dt['prediction'])/predictions_dt['week_day_end_price'])*100
predictions_dt = predictions_dt.withColumn('Accuracy',abs(x_dt))
predictions_dt.select("prediction","week_day_end_price","Accuracy","features").show()

+------------------+------------------+------------------+--------------------+
|        prediction|week_day_end_price|          Accuracy|            features|
+------------------+------------------+------------------+--------------------+
| 3906.534423828125|         4980.6704|21.566092470961713|[3908.23681640625...|
| 9297.384114583334|         10724.805|13.309525110330044|[7896.830078125,3...|
|         14816.625|         18318.875|19.118259172574735|[16200.0927734375...|
|   4395.6240234375|         3908.2368|12.470769554835172|[3906.53442382812...|
|     6365.11328125|         3508.6577| 81.41163369460949|[6365.11328125,2....|
| 6579.897705078125|         6665.8735|1.2897908972422427|[6658.83154296875...|
|   4395.6240234375|         3700.6787|18.778861035573343|[3773.259765625,8...|
| 9297.384114583334|          8358.547|11.232062864794713|[8257.533203125,3...|
| 9297.384114583334|         7880.5845|17.978357402842914|[8358.546875,3.10...|
|10343.394897460938|          8257.533| 

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator_dt = RegressionEvaluator(
    labelCol="week_day_end_price", predictionCol="prediction", metricName="rmse")
rmse_dt = evaluator_dt.evaluate(predictions_dt)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse_dt)

Root Mean Squared Error (RMSE) on test data = 1759.94
