In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, split, explode,substring, length, udf
from pyspark.sql.types import DecimalType, StringType
from pyspark.sql.types import DoubleType

from pyspark.sql.types import *
from pyspark.sql import Row
from itertools import cycle
from pyspark.ml.regression import LinearRegression
from pyspark.sql import functions as F
from pyspark.sql import types as T

import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

In [2]:
# Setting up spark
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()


print(spark)
print(spark.version)

<pyspark.sql.session.SparkSession object at 0x7fbf5f5ba910>
3.0.0-preview2


In [3]:
topinves_df=spark.read.csv("hdfs://localhost:9000/eda/dc_investments.csv", header='true', inferSchema='true')

In [4]:
columns_to_drop = ['_c0', 'company_permalink','investor_permalink','funding_round_permalink']
topinves_df = topinves_df.drop(*columns_to_drop)

In [5]:
topinves_df.printSchema()

root
 |-- company_name: string (nullable = true)
 |-- company_category_list: string (nullable = true)
 |-- company_country_code: string (nullable = true)
 |-- company_region: string (nullable = true)
 |-- company_city: string (nullable = true)
 |-- investor_name: string (nullable = true)
 |-- investor_country_code: string (nullable = true)
 |-- funding_round_type: string (nullable = true)
 |-- funded_at: string (nullable = true)
 |-- raised_amount_usd: double (nullable = true)



In [6]:
topinves_df.select('raised_amount_usd', 'funded_at','company_category_list').filter(topinves_df.company_category_list.contains('|')).show(5, truncate = False)

+-----------------+----------+----------------------------+
|raised_amount_usd|funded_at |company_category_list       |
+-----------------+----------+----------------------------+
|500000.0         |2009-05-15|Art|E-Commerce|Marketplaces |
|500000.0         |2009-05-15|Art|E-Commerce|Marketplaces |
|500000.0         |2009-05-15|Art|E-Commerce|Marketplaces |
|6602693.89911084 |2015-11-04|Local Businesses|Restaurants|
|1736910.0        |2013-11-13|Local Businesses|Restaurants|
+-----------------+----------+----------------------------+
only showing top 5 rows



In [7]:
splitCategory = topinves_df.select('raised_amount_usd',  substring('funded_at',-0,4).cast('int').alias('year')
                       , split(col("company_category_list")
                       , "[|]s*").alias("category_list")).filter(col('year') >= 1990)
splitCategory.show(5)

+-----------------+----+-------------+
|raised_amount_usd|year|category_list|
+-----------------+----+-------------+
|        2000000.0|2008|[Curated Web]|
|        6000000.0|2014|   [Software]|
|          41250.0|2014|      [Games]|
|            2.0E7|2015|  [Analytics]|
|        3000000.0|2013|  [Analytics]|
+-----------------+----+-------------+
only showing top 5 rows



In [8]:
open_list=splitCategory.select('raised_amount_usd','year', explode('category_list').alias('category'))
open_list.show(10)

+-----------------+----+-----------+
|raised_amount_usd|year|   category|
+-----------------+----+-----------+
|        2000000.0|2008|Curated Web|
|        6000000.0|2014|   Software|
|          41250.0|2014|      Games|
|            2.0E7|2015|  Analytics|
|        3000000.0|2013|  Analytics|
|            2.0E7|2015|  Analytics|
|        1700000.0|2013|  Analytics|
|        8900000.0|2014|  Analytics|
|            2.0E7|2015|  Analytics|
|        8900000.0|2014|  Analytics|
+-----------------+----+-----------+
only showing top 10 rows



In [9]:
open_list.createOrReplaceTempView("comparision")

In [10]:
sql2 =  """
            SELECT CATEGORY, 
            CAST(YEAR AS INT), 
            SUM(RAISED_AMOUNT_USD) AS TOTAL, 
            CAST(SUM(RAISED_AMOUNT_USD) AS DECIMAL(30)) AS TOTAL_DEC 
            FROM comparision GROUP 
            BY CATEGORY, YEAR 
            """

In [11]:
df2 = spark.sql(sql2)
df2.show(7)

+--------------------+----+-------------------+----------+
|            CATEGORY|YEAR|              TOTAL| TOTAL_DEC|
+--------------------+----+-------------------+----------+
|      Interest Graph|2011|             1.86E7|  18600000|
|  Big Data Analytics|2013|      2.474244075E9|2474244075|
|           Aerospace|2014|4.575373451009297E8| 457537345|
|               Audio|2005|            1.118E8| 111800000|
|Cloud Infrastructure|2010|       1.26510796E8| 126510796|
|    Cloud Management|2010|           5.5378E8| 553780000|
|                Apps|2008|6.918205280557648E8| 691820528|
+--------------------+----+-------------------+----------+
only showing top 7 rows



In [12]:
from pyspark.ml.feature import VectorAssembler

vectorAssembler = VectorAssembler(inputCols = ['YEAR'], outputCol = 'FEATURES')
featureDF = vectorAssembler.transform(df2).select('CATEGORY', 'FEATURES', 'TOTAL')

featureDF.show(7)

+--------------------+--------+-------------------+
|            CATEGORY|FEATURES|              TOTAL|
+--------------------+--------+-------------------+
|      Interest Graph|[2011.0]|             1.86E7|
|  Big Data Analytics|[2013.0]|      2.474244075E9|
|           Aerospace|[2014.0]|4.575373451009297E8|
|               Audio|[2005.0]|            1.118E8|
|Cloud Infrastructure|[2010.0]|       1.26510796E8|
|    Cloud Management|[2010.0]|           5.5378E8|
|                Apps|[2008.0]|6.918205280557648E8|
+--------------------+--------+-------------------+
only showing top 7 rows



In [17]:
featureDF.printSchema()

root
 |-- CATEGORY: string (nullable = true)
 |-- FEATURES: vector (nullable = true)
 |-- TOTAL: double (nullable = true)



In [13]:
from pyspark.ml.feature import VectorAssembler
vectorAssembler = VectorAssembler(inputCols = ['YEAR'], outputCol = 'features')
data = vectorAssembler.transform(df2)
data = data.select('features','TOTAL')
data.show(3)

+--------+-------------------+
|features|              TOTAL|
+--------+-------------------+
|[2011.0]|             1.86E7|
|[2013.0]|      2.474244075E9|
|[2014.0]|4.575373451009297E8|
+--------+-------------------+
only showing top 3 rows



In [14]:
splits = data.randomSplit([0.7, 0.3])
train_df=splits[0]
test_df=splits[1]

In [18]:
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator
tr = DecisionTreeRegressor(featuresCol ='features', labelCol = 'TOTAL')

tr_model = tr.fit(train_df)
tr_predictions = tr_model.transform(test_df)
tr_evaluator = RegressionEvaluator(
    labelCol="TOTAL", predictionCol="prediction", metricName="r2")
rmse = tr_evaluator.evaluate(tr_predictions)
print("R Squared (R2) on test data  = %g" % rmse)
tr_model

R Squared (R2) on test data  = 0.0282254


DecisionTreeRegressionModel: uid=DecisionTreeRegressor_be5d598359f6, depth=5, numNodes=25, numFeatures=1

In [19]:
tr = DecisionTreeRegressor(featuresCol ='features', labelCol = 'TOTAL')
tr_model = tr.fit(train_df)
tr_predictions = tr_model.transform(test_df)
tr_evaluator = RegressionEvaluator(
labelCol="TOTAL", predictionCol="prediction", metricName="rmse")
rmse = tr_evaluator.evaluate(tr_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 2.12511e+09


In [22]:
from pyspark.ml.feature import VectorAssembler
vectorAssembler = VectorAssembler(inputCols = ['YEAR'], outputCol = 'features')
data = vectorAssembler.transform(df2)
data = data.select('features','TOTAL')
data.show(3)

+--------+-------------------+
|features|              TOTAL|
+--------+-------------------+
|[2011.0]|             1.86E7|
|[2013.0]|      2.474244075E9|
|[2014.0]|4.575373451009297E8|
+--------+-------------------+
only showing top 3 rows



In [29]:
df = data.selectExpr("features as Features", "TOTAL as label")
df.show(5)

+--------+-------------------+
|Features|              label|
+--------+-------------------+
|[2011.0]|             1.86E7|
|[2013.0]|      2.474244075E9|
|[2014.0]|4.575373451009297E8|
|[2005.0]|            1.118E8|
|[2010.0]|       1.26510796E8|
+--------+-------------------+
only showing top 5 rows



In [30]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.mllib.util import MLUtils

featureIndexer =\
    VectorIndexer(inputCol="Features", outputCol="indexedFeatures", maxCategories=4).fit(df)
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = df.randomSplit([0.7, 0.3])

# Train a DecisionTree model.
dt = DecisionTreeRegressor(featuresCol="indexedFeatures")

# Chain indexer and tree in a Pipeline
pipeline = Pipeline(stages=[featureIndexer, dt])

# Train model.  This also runs the indexer.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

treeModel = model.stages[1]
print(treeModel) # summary only

+--------------------+-------------------+--------+
|          prediction|              label|features|
+--------------------+-------------------+--------+
|2.6237818364195803E8|          1800000.0|[2001.0]|
|2.6237818364195803E8|            2.844E7|[2003.0]|
|3.2892324007344264E8|            2.986E7|[2005.0]|
|3.2892324007344264E8|            1.118E8|[2005.0]|
|  4.76924685647094E8|6.918205280557648E8|[2008.0]|
+--------------------+-------------------+--------+
only showing top 5 rows

Root Mean Squared Error (RMSE) on test data = 2.53798e+09
DecisionTreeRegressionModel: uid=DecisionTreeRegressor_591848c9e12a, depth=5, numNodes=25, numFeatures=1
