# Predictive Data Analytics

In [1]:
import os

from pyspark.sql import SparkSession, DataFrame
import pyspark.sql.functions as F
from pyspark.sql.functions import col, split, when, sin, cos
from pyspark.sql.types import IntegerType, StringType
from pyspark import keyword_only
from pyspark.ml import Transformer, Pipeline
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param, Params, TypeConverters
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, Tokenizer
from pyspark.ml.regression import LinearRegression, DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator 
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator 

import numpy as np
import math
from pprint import pprint

## Read Hive tables

### Connect to Hive

In [None]:
# Add here your team number teamx
team = "team13"

# Location of Hive database in HDFS
warehouse = "project/hive/warehouse"

spark = SparkSession.builder \
        .appName("{} - spark ML".format(team)) \
        .master("yarn") \
        .config("hive.metastore.uris", "thrift://hadoop-02.uni.innopolis.ru:9883") \
        .config("spark.sql.warehouse.dir", warehouse) \
        .config("spark.sql.avro.compression.codec", "snappy") \
        .enableHiveSupport() \
        .getOrCreate()

In [None]:
spark

### List all databases

In [None]:
spark.sql("SHOW DATABASES;").show()

### List all tables

In [None]:
print(spark.catalog.listTables("team13_projectdb"))

### Read Hive tables

In [None]:
objects = spark.read.format("avro").table('team13_projectdb.objects_part')

fund_rounds = spark.read.format("avro").table('team13_projectdb.funding_rounds_part')

## ML Modeling

### Preprocessing the data

#### Feature selection


In [None]:
# Select features and label
obj_features = ['id', 'status', 'investment_rounds', 'invested_companies', 'milestones', 'relationships']

fund_features = ['object_id', 'funded_at', 'funding_round_type', 'participants', 'is_first_round', 'is_last_round']
label = 'raised_amount_usd'

In [None]:
objects = objects.select(obj_features)
fund_rounds = fund_rounds.select(fund_features + [label])

In [None]:
# Join tables to form one Dataframe for the ML task
final = objects.join(fund_rounds, objects['id'] == fund_rounds['object_id'], how='right').drop('id').drop('object_id')

final.show(5)

In [None]:
# Split funded_at with datetime to year, month and day
split_col = F.split(F.to_date("funded_at"), "-")
final = final.withColumn("funded_year", split_col.getItem(0).cast(IntegerType())) \
            .withColumn("funded_month", split_col.getItem(1).cast(IntegerType())) \
            .withColumn("funded_day", split_col.getItem(2).cast(IntegerType()))
final = final.drop("funded_at")

# Drop missing values
final = final.na.drop()

# Rename label
final = final.withColumnRenamed("raised_amount_usd", "label")

In [None]:
final.show()

#### Building the Pipeline

In [None]:
# Extract categorical, numerical and cyclical features
categorical_cols = ['status', 'funding_round_type']

numerical_cols = ['investment_rounds', 'invested_companies', 'milestones', 'relationships', 'funded_year', 'participants', 'is_first_round', 'is_last_round']

cyclical_cols = ['funded_month', 'funded_day']
periods = [12, 31]

In [None]:
# Build a custom tranformer to encode cyclical features
class CyclicTransformer(Transformer, HasInputCol, HasOutputCol, DefaultParamsReadable, DefaultParamsWritable):
    inputCol = Param(Params._dummy(), "inputCol", "input column name.", typeConverter=TypeConverters.toString)
    outputCol = Param(Params._dummy(), "outputCol", "output column name.", typeConverter=TypeConverters.toString)
  
    @keyword_only
    def __init__(self, inputCol: str = "input", outputCol: str = "output", period: int = 12):
        super(CyclicTransformer, self).__init__()
        self._setDefault(inputCol=None, outputCol=None)
        kwargs = self._input_kwargs
        del(kwargs["period"])
        self.set_params(**kwargs)
        self.period = period
    
    @keyword_only
    def set_params(self, inputCol: str = "input", outputCol: str = "output"):
        kwargs = self._input_kwargs
        self._set(**kwargs)
    
    def getInputCol(self):
        return self.getOrDefault(self.inputCol)
  
    def getOutputCol(self):
        return self.getOrDefault(self.outputCol)
  
    def _transform(self, df: DataFrame):
        input_col = self.getInputCol()
        output_col = self.getOutputCol()
        
        sin_col = sin(2 * math.pi * df[input_col] / self.period) 
        cos_col = cos(2 * math.pi * df[input_col] / self.period)
       
        return df.withColumn(output_col + "_sin", sin_col).withColumn(output_col + "_cos", cos_col)

In [None]:
# Create String indexer to assign index for the string fields where each unique string will get a unique index
indexers = [StringIndexer(inputCol=c, 
                           outputCol="{0}_indexed".format(c)).setHandleInvalid("skip") for c in categorical_cols]

# Encode strings using One Hot encoding
encoders = [OneHotEncoder(inputCol=indexer.getOutputCol(), 
                           outputCol="{0}_encoded".format(indexer.getOutputCol())) for indexer in indexers]

# Encode cyclical features using custom Cyclic Transformer
cyclic_transformers = [CyclicTransformer(inputCol=col, 
                                         outputCol=col + "_cyc_enc", 
                                         period=period) for col, period in zip(cyclical_cols, periods)]

# This will concatenate the input cols into a single column
assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders] + [transformer.getOutputCol() for transformer in cyclic_transformers] + numerical_cols, outputCol= "features")

# Create a pipeline to use only a single fit and transform on the data
pipeline = Pipeline(stages=indexers + encoders + cyclic_transformers + [assembler])

# Fit the pipeline ==> This will call the fit functions for all transformers if exist
model = pipeline.fit(final)

# Fit the pipeline ==> This will call the transform functions for all transformers
data = model.transform(final)

data.show()

In [None]:
# Delete all features and keep only the features and label columns
data = data.select(["features", "label"])

data.show()

#### Split the dataset

In [None]:
# Split the data into 60% training and 40% test (it is not stratified)
(train_data, test_data) = data.randomSplit([0.6, 0.4], seed=10)

In [None]:
# A function to run commands
def run(command):
    return os.popen(command).read()

train_data.select("features", "label")\
    .coalesce(1)\
    .write\
    .mode("overwrite")\
    .format("json")\
    .save("project/data/train")

# Run it from root directory of the repository
run("hdfs dfs -cat project/data/train/*.json > data/train.json")

test_data.select("features", "label")\
    .coalesce(1)\
    .write\
    .mode("overwrite")\
    .format("json")\
    .save("project/data/test")

# Run it from root directory of the repository
run("hdfs dfs -cat project/data/test/*.json > data/test.json")

###  Modeling: First model

First model is Linear Regression

#### Build a model

In [None]:
# Create Linear Regression Model
lr = LinearRegression()

# Fit the data to the lr model
model_lr = lr.fit(train_data)

#### Prediction

In [None]:
# Transform the data (Prediction)
predictions = model_lr.transform(test_data)

# Display the predictions
predictions.show()

#### Evaluation

In [None]:
# Evaluate the performance of the model
evaluator1_rmse = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
evaluator1_r2 = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="r2")

rmse_lr = evaluator1_rmse.evaluate(predictions)
r2_lr = evaluator1_r2.evaluate(predictions)

print("Root Mean Squared Error (RMSE) on test data = {}".format(rmse_lr))
print("R2 on test data = {}".format(r2_lr))

#### Hyperparameter optimization

In [None]:
grid = ParamGridBuilder()
grid = grid.addGrid(model_lr.aggregationDepth, [2, 3, 4]) \
                    .addGrid(model_lr.regParam, np.logspace(1e-3,1e-1)) \
                    .build()

cv = CrossValidator(estimator=lr, 
                    estimatorParamMaps=grid, 
                    evaluator=evaluator1_rmse,
                    parallelism=5,
                    numFolds=3)

cvModel = cv.fit(train_data)
bestModel = cvModel.bestModel
bestModel

#### Select the best model

In [None]:
model1 = bestModel
pprint(model1.extractParamMap())

#### Save the model to HDFS

In [None]:
model1.write().overwrite().save("project/models/model1")

# Run it from root directory of the repository
run("hdfs dfs -get project/models/model1 models/model1")

#### Prediction of the best model 1

In [None]:
predictions = model1.transform(test_data)
predictions.show()

In [None]:
predictions.select("label", "prediction")\
    .coalesce(1)\
    .write\
    .mode("overwrite")\
    .format("csv")\
    .option("sep", ",")\
    .option("header","true")\
    .save("project/output/model1_predictions.csv")

# Run it from root directory of the repository
run("hdfs dfs -cat project/output/model1_predictions.csv/*.csv > output/model1_predictions.csv")

#### Evaluation of the best model 1

In [None]:
# Evaluate the performance of the best model
evaluator1_rmse = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
evaluator1_r2 = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="r2")

rmse1 = evaluator1_rmse.evaluate(predictions)
r2_1 = evaluator1_r2.evaluate(predictions)

print("Root Mean Squared Error (RMSE) on test data = {}".format(rmse1))
print("R2 on test data = {}".format(r2_1))

###  Modeling: Second model


Second model is Decision Tree Regression

#### Build a model

In [None]:
# Create Decision tree regression Model
dt = DecisionTreeRegressor()

# Fit the data to the lr model
model_dt = dt.fit(train_data)

#### Prediction

In [None]:
# Transform the data (Prediction)
predictions = model_dt.transform(test_data)

# Display the predictions
predictions.show()

#### Evaluation

In [None]:
# Evaluate the performance of the model
evaluator2_rmse = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
evaluator2_r2 = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="r2")

rmse_dt = evaluator2_rmse.evaluate(predictions)
r2_dt = evaluator2_r2.evaluate(predictions)

print("Root Mean Squared Error (RMSE) on test data = {}".format(rmse_dt))
print("R2 on test data = {}".format(r2_dt))

#### Hyperparameter optimization

In [None]:
grid = ParamGridBuilder()
grid = (ParamGridBuilder().addGrid(model_dt.maxDepth, [5, 10, 15, 20]) \
             .addGrid(model_dt.maxBins, [10, 20]) \
             .build())

cv = CrossValidator(estimator=dt, 
                    estimatorParamMaps=grid, 
                    evaluator=evaluator2_rmse,
                    parallelism=5,
                    numFolds=3)

cvModel = cv.fit(train_data)
bestModel = cvModel.bestModel
bestModel

#### Select the best model

In [None]:
model2 = bestModel
pprint(model2.extractParamMap())

#### Save the model to HDFS

In [None]:
model2.write().overwrite().save("project/models/model2")

# Run it from root directory of the repository
run("hdfs dfs -get project/models/model2 models/model2")

#### Prediction of the best model 2

In [None]:
predictions = model2.transform(test_data)
predictions.show()

In [None]:
predictions.select("label", "prediction")\
    .coalesce(1)\
    .write\
    .mode("overwrite")\
    .format("csv")\
    .option("sep", ",")\
    .option("header","true")\
    .save("project/output/model2_predictions.csv")

# Run it from root directory of the repository
run("hdfs dfs -cat project/output/model2_predictions.csv/*.csv > output/model2_predictions.csv")

#### Evaluation of the best model 2

In [None]:
# Evaluate the performance of the best model
evaluator2_rmse = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
evaluator2_r2 = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="r2")

rmse2 = evaluator2_rmse.evaluate(predictions)
r2_2 = evaluator2_r2.evaluate(predictions)

print("Root Mean Squared Error (RMSE) on test data = {}".format(rmse2))
print("R2 on test data = {}".format(r2_2))

### Compare best models

In [None]:
# Create dataframe to report performance of the models
models = [[str(model1), rmse1, r2_1], [str(model2), rmse2, r2_2]]

df = spark.createDataFrame(models, ["model", "RMSE", "R2"])
df.show(truncate=False)

In [None]:
# Save it to HDFS
df.coalesce(1)\
    .write\
    .mode("overwrite")\
    .format("csv")\
    .option("sep", ",")\
    .option("header","true")\
    .save("project/output/evaluation.csv")

# Run it from root directory of the repository
run("hdfs dfs -cat project/output/evaluation.csv/*.csv > output/evaluation.csv")

## Stop spark

In [None]:
spark.stop()