In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, split, explode,substring, length, udf
from pyspark.sql.types import *
from pyspark.sql import Row
from itertools import cycle
from pyspark.ml.regression import LinearRegression
import numpy as np

import matplotlib.pyplot as plt
%matplotlib inline

from pyspark.sql import functions as F
from pyspark.sql import types as T

In [2]:
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [3]:
# spark is an custom SparkSession based on some config to work with Jupyter notebooks
iv = spark.read.csv("hdfs://localhost:9000/user/lavish/data/investments.csv"
                , header='true'
                , inferSchema='true')

In [4]:
startYear=1995
endYear=2015

In [5]:
type(iv)

pyspark.sql.dataframe.DataFrame

In [6]:
iv.schema.names

['company_permalink',
 'company_name',
 'company_category_list',
 'company_country_code',
 'company_state_code',
 'company_region',
 'company_city',
 'investor_permalink',
 'investor_name',
 'investor_country_code',
 'investor_state_code',
 'investor_region',
 'investor_city',
 'funding_round_permalink',
 'funding_round_type',
 'funding_round_code',
 'funded_at',
 'raised_amount_usd']

In [7]:
filteredIV = iv.filter(iv.raised_amount_usd.isNotNull())

In [8]:
from pyspark.sql.types import DoubleType
splittedCategoryIV = filteredIV.select('raised_amount_usd',  substring('funded_at',-4,4).cast('int').alias('year')
                       , split(col("company_category_list")
                       , "[|]s*").alias("categoryArr"))

In [9]:
explodedIV=splittedCategoryIV.select('raised_amount_usd','year', explode('categoryArr').alias('category'))

In [10]:
explodedIV.show(10)

+-----------------+----+-----------+
|raised_amount_usd|year|   category|
+-----------------+----+-----------+
|        2000000.0|2008|Curated Web|
|          41250.0|2014|      Games|
|            2.0E7|2015|  Analytics|
|        3000000.0|2013|  Analytics|
|            2.0E7|2015|  Analytics|
|        1700000.0|2013|  Analytics|
|        8900000.0|2014|  Analytics|
|            2.0E7|2015|  Analytics|
|            2.0E7|2015|  Analytics|
|        8900000.0|2014|  Analytics|
+-----------------+----+-----------+
only showing top 10 rows



In [11]:
explodedIV.createOrReplaceTempView("investments")

In [12]:
sqlDF = spark.sql("SELECT * FROM investments")
sqlDF.show(5)

+-----------------+----+-----------+
|raised_amount_usd|year|   category|
+-----------------+----+-----------+
|        2000000.0|2008|Curated Web|
|          41250.0|2014|      Games|
|            2.0E7|2015|  Analytics|
|        3000000.0|2013|  Analytics|
|            2.0E7|2015|  Analytics|
+-----------------+----+-----------+
only showing top 5 rows



### Year Wise

In [13]:
SQLQUERY =  """
            SELECT CATEGORY, 
            CAST(YEAR AS INT), 
            SUM(RAISED_AMOUNT_USD) AS TOTAL, 
            CAST(SUM(RAISED_AMOUNT_USD) AS DECIMAL(30)) AS TOTAL_DEC 
            FROM INVESTMENTS GROUP 
            BY CATEGORY, YEAR 
            """
#  ORDER BY YEAR DESC, TOTAL DESC

In [14]:
sqlDF = spark.sql(SQLQUERY)
sqlDF.show(5)

+------------------+----+-----------------+----------+
|          CATEGORY|YEAR|            TOTAL| TOTAL_DEC|
+------------------+----+-----------------+----------+
|    Interest Graph|2011|           3.28E7|  32800000|
|         Insurance|2015|  5.70529580149E9|5705295801|
|Big Data Analytics|2013|     2.35683979E9|2356839790|
|         Aerospace|2014|4.6013734510098E8| 460137345|
|             Audio|2005|          1.058E8| 105800000|
+------------------+----+-----------------+----------+
only showing top 5 rows



In [15]:
from pyspark.ml.feature import VectorAssembler

vectorAssembler = VectorAssembler(inputCols = ['YEAR'], outputCol = 'FEATURES')
featureDF = vectorAssembler.transform(sqlDF).select('CATEGORY', 'FEATURES', 'TOTAL')

featureDF.show(5)

+------------------+--------+-----------------+
|          CATEGORY|FEATURES|            TOTAL|
+------------------+--------+-----------------+
|    Interest Graph|[2011.0]|           3.28E7|
|         Insurance|[2015.0]|  5.70529580149E9|
|Big Data Analytics|[2013.0]|     2.35683979E9|
|         Aerospace|[2014.0]|4.6013734510098E8|
|             Audio|[2005.0]|          1.058E8|
+------------------+--------+-----------------+
only showing top 5 rows



In [16]:
f = featureDF.select('CATEGORY').distinct()


In [17]:
topCategories = [row.CATEGORY for row in f.collect()]


In [18]:
len(topCategories)

837

In [19]:
# Features matrix to predict the amount for the Year 2020

l =  [(2020,)]

rdd = sc.parallelize(l)
test = rdd.map(lambda x: Row(YEAR=x[0] ))
testDF = sqlContext.createDataFrame(test)

vectorAssembler = VectorAssembler(inputCols = ['YEAR'], outputCol = 'FEATURES')
vectorDF = vectorAssembler.transform(testDF).select('FEATURES')

In [20]:
#count = 0 
summaryInfo = []
columns = ['Category', 'Gradient', 'Intercept','RMSE', 'R2', 'Pediction']

statInfo = []
for category in topCategories:
    #print(category)
    categoryDF=featureDF.filter(featureDF.CATEGORY == category)
    if(categoryDF.count() > 10):
        #count +=1
        lr = LinearRegression(featuresCol = 'FEATURES', labelCol='TOTAL', maxIter=10, regParam=0.3, elasticNetParam=0.8)
        lr_model = lr.fit(categoryDF)
        if (lr_model.summary.r2 >= .5):
            row=(category
                 ,lr_model.coefficients
                 ,lr_model.intercept
                 ,lr_model.summary.rootMeanSquaredError
                 ,lr_model.summary.r2
                 ,lr_model.transform(vectorDF).take(1)[0].prediction )
            summaryInfo.append(row)                        
summaryDF = spark.createDataFrame(summaryInfo, columns)


In [28]:
summaryDF.show()

+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+
|            Category|            Gradient|           Intercept|                RMSE|                R2|           Pediction|
+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+
|    Personal Finance|[3.09531844542449...|-6.20846579526586...| 1.140333317135928E9|0.5257188832329348| 4.407746449161743E9|
|Reviews and Recom...|[9.38435051052876...|-1.88076601007935...|2.2526002798534468E8|0.6748252913533789|1.4872793047452393E9|
|         Health Care|[1.08986421814419E9]|-2.17925088637451...|2.0237455217204463E9|0.8747611056595241|2.227483427674951E10|
|Application Perfo...|[1.46386390370918...|-2.93690139320354...|4.7040592338190275E8|0.5668998218851391| 2.010369228901245E9|
|        Credit Cards|[6.33419228608998...|-1.27009681219964...|2.5974312104301742E8|0.5027780036421106| 9.41002959053

In [29]:
summaryDF.write.('summaryDF.csv')

TypeError: 'DataFrameWriter' object is not callable

In [30]:
summaryDF.write.saveAsTable("summaryDF")

In [31]:
SQLQUERY =  """
            SELECT*
            FROM summaryDF
            
            """
#  ORDER BY YEAR DESC, TOTAL DESC

sqlDF = spark.sql(SQLQUERY)
sqlDF.show(5)

+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+
|            Category|            Gradient|           Intercept|                RMSE|                R2|           Pediction|
+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+
|                Kids|[3.18674767862354...|-6.39071308213002...| 9.505712557482366E7|0.5291670832619793| 4.651722868952179E8|
|Location Based Se...|[5.78892288743861...|-1.15886232468249...|1.6588587121286646E8|0.7474126418043531|1.0500098580102386E9|
|           Lifestyle|[1.34058267510281...|-2.68492723540129...| 5.407734746394048E8|0.5876254467293467| 2.304976830638977E9|
|Mobile Software T...|[6.86504849357692...|-1.37568550275783...|2.2023283087968177E8|0.6460663909210812|1.1054292944698792E9|
|             Storage|[3.37263853573364...|-6.75052385972356...|1.4688655636029084E9|0.5292015214853756| 6.22059824584