# Importing Libraries

In [None]:
!pip install pandas
!pip install numpy
!pip install matplotlib
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m4.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m15.8 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=p

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

# Load functionality to manipulate dataframes
from pyspark.sql import functions as fn
from pyspark.sql.functions import col, monotonically_increasing_id, split, when

# Functionality for computing features
from pyspark.ml import feature, regression, classification, Pipeline, clustering
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier
from pyspark.ml.feature import RFormula, Tokenizer, VectorAssembler, HashingTF, Word2Vec
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession

from itertools import chain
from pyspark.ml.linalg import Vectors, VectorUDT

#Evaluation
from pyspark.ml.evaluation import BinaryClassificationEvaluator

## Creating Spark Session

In [None]:
spark = SparkSession \
    .builder \
    .appName("how to read csv file") \
    .getOrCreate()

# Reading in the datasets

In [None]:
dfacq = spark.read.format("csv").load("/content/drive/MyDrive/Data/acquisitions.csv", delimiter = ",", header = True)
dfadd = spark.read.format("csv").load("/content/drive/MyDrive/Data/additions.csv", delimiter = ",", header = True)
dfcom = spark.read.format("csv").load("/content/drive/MyDrive/Data/companies.csv", delimiter = ",", header = True)
dfinv = spark.read.format("csv").load("/content/drive/MyDrive/Data/investments.csv", delimiter = ",", header = True)
dfrou = spark.read.format("csv").load("/content/drive/MyDrive/Data/rounds.csv", delimiter = ",", header = True)

# Taking the necessary columns alone

In [None]:
dfcom = dfcom.select(col("permalink"), col("name"), col("homepage_url"), col("category_list"), col(" market ").alias("market"), col(" funding_total_usd ").alias("funding_total_usd"), col("status"), col("country_code"), col("state_code"), col("region"), col("city"), col("funding_rounds"), col("founded_at"), col("founded_month"), col("founded_quarter"), col("founded_year"), col("first_funding_at"), col("last_funding_at"))
dfacq = dfacq.select(col("company_permalink"), col("company_name"), col("company_category_list"), col("company_market"), col("company_country_code"), col("company_state_code"), col("company_region"), col("company_city"), col("acquirer_permalink"), col("acquirer_name"), col("acquirer_category_list"), col("acquirer_market"), col("acquirer_country_code"), col("acquirer_state_code"), col("acquirer_region"), col("acquirer_city"), col("acquired_at"), col("acquired_month"), col("acquired_quarter"), col("acquired_year"), col(" price_amount ").alias("price_amount"), col("price_currency_code"))
dfrou = dfrou.select(col("company_permalink"), col("company_name"), col("company_category_list"), col("company_market"), col("company_country_code"), col("company_state_code"), col("company_region"), col("company_city"), col("funding_round_permalink"), col("funding_round_type"), col("funding_round_code"), col("funded_at"), col("funded_month"), col(" funded_quarter ").alias("funded_quarter"), col("funded_year"), col(" raised_amount_usd ").alias("raised_amount_usd"))

# Examination of the Company dataset

In [None]:
dfcom.count()

54294

In [None]:
#checking for null values
dfcom.toPandas().isnull().sum()

permalink             4856
name                  4856
homepage_url          8305
category_list         8817
market                8824
funding_total_usd     4856
status                6170
country_code         10129
state_code           24133
region               10129
city                 10972
funding_rounds        4856
founded_at           15740
founded_month        15812
founded_quarter      15812
founded_year         15812
first_funding_at      4856
last_funding_at       4856
dtype: int64

In [None]:
#Null values in closed category
dfcom.filter(col("status") == "closed").toPandas().isnull().sum()

permalink               0
name                    0
homepage_url           31
category_list          78
market                 78
funding_total_usd       0
status                  0
country_code          408
state_code           1074
region                408
city                  441
funding_rounds          0
founded_at            607
founded_month         607
founded_quarter       607
founded_year          607
first_funding_at        0
last_funding_at         0
dtype: int64

In [None]:
#Null values in operating category
dfcom.filter(col("status") == "operating").toPandas().isnull().sum()

permalink                0
name                     0
homepage_url          3092
category_list         3353
market                3358
funding_total_usd        0
status                   0
country_code          4439
state_code           16696
region                4439
city                  5166
funding_rounds           0
founded_at            9165
founded_month         9232
founded_quarter       9232
founded_year          9232
first_funding_at         0
last_funding_at          0
dtype: int64

In [None]:
#Null values in acquired category
dfcom.filter(col("status") == "acquired").toPandas().isnull().sum()

permalink              0
name                   0
homepage_url         254
category_list        151
market               153
funding_total_usd      0
status                 0
country_code         220
state_code           804
region               220
city                 250
funding_rounds         0
founded_at           716
founded_month        721
founded_quarter      721
founded_year         721
first_funding_at       0
last_funding_at        0
dtype: int64

### Different markets and their occurence

In [None]:
markets = dfcom.select("market").\
  groupby(col("market")).\
  agg(fn.count("market")).\
  sort("count(market)", ascending=False)

In [None]:
display(markets.take(10))

[Row(market=' Software ', count(market)=4620),
 Row(market=' Biotechnology ', count(market)=3688),
 Row(market=' Mobile ', count(market)=1983),
 Row(market=' E-Commerce ', count(market)=1805),
 Row(market=' Curated Web ', count(market)=1655),
 Row(market=' Enterprise Software ', count(market)=1280),
 Row(market=' Health Care ', count(market)=1207),
 Row(market=' Clean Technology ', count(market)=1200),
 Row(market=' Games ', count(market)=1182),
 Row(market=' Hardware + Software ', count(market)=1081)]

In [None]:
#Number of unique markets
markets.count()

754

### Cities where most startups are located

In [None]:
cities = dfcom.select("permalink", "name", "city").\
  groupBy("city").\
  agg(fn.count("city")).\
  sort("count(city)", ascending=False)

In [None]:
#Displaying top 10 cities 
display(cities.take(10))

[Row(city='San Francisco', count(city)=2615),
 Row(city='New York', count(city)=2334),
 Row(city='London', count(city)=1257),
 Row(city='Palo Alto', count(city)=597),
 Row(city='Austin', count(city)=583),
 Row(city='Cambridge', count(city)=554),
 Row(city='Seattle', count(city)=554),
 Row(city='Chicago', count(city)=514),
 Row(city='Los Angeles', count(city)=508),
 Row(city='Mountain View', count(city)=497)]

### Countries where most startups are located

In [None]:
countries = dfcom.select("permalink", "name", "country_code").\
  groupBy("country_code").\
  agg(fn.count("country_code")).\
  sort("count(country_code)", ascending=False)

In [None]:
display(countries.take(10))

[Row(country_code='USA', count(country_code)=28793),
 Row(country_code='GBR', count(country_code)=2642),
 Row(country_code='CAN', count(country_code)=1405),
 Row(country_code='CHN', count(country_code)=1239),
 Row(country_code='DEU', count(country_code)=968),
 Row(country_code='FRA', count(country_code)=866),
 Row(country_code='IND', count(country_code)=849),
 Row(country_code='ISR', count(country_code)=682),
 Row(country_code='ESP', count(country_code)=549),
 Row(country_code='RUS', count(country_code)=368)]

# Examination of the acquisition dataset

In [None]:
display(dfacq)

DataFrame[company_permalink: string, company_name: string, company_category_list: string, company_market: string, company_country_code: string, company_state_code: string, company_region: string, company_city: string, acquirer_permalink: string, acquirer_name: string, acquirer_category_list: string, acquirer_market: string, acquirer_country_code: string, acquirer_state_code: string, acquirer_region: string, acquirer_city: string, acquired_at: string, acquired_month: string, acquired_quarter: string, acquired_year: string, price_amount: string, price_currency_code: string]

In [None]:
dfacq.count()

55240

### Companies that acquire the most

In [None]:
acquirers = dfacq.\
  select("acquirer_name").\
  groupby(col("acquirer_name")).\
  agg(fn.count("acquirer_name")).\
  sort("count(acquirer_name)", ascending=False)

In [None]:
#displaying top 10 comapanies
display(acquirers.take(10))

[Row(acquirer_name='Cisco', count(acquirer_name)=168),
 Row(acquirer_name='Google', count(acquirer_name)=165),
 Row(acquirer_name='Microsoft', count(acquirer_name)=156),
 Row(acquirer_name='IBM', count(acquirer_name)=122),
 Row(acquirer_name='Yahoo!', count(acquirer_name)=116),
 Row(acquirer_name='Oracle Corporation', count(acquirer_name)=90),
 Row(acquirer_name='Hewlett-Packard', count(acquirer_name)=88),
 Row(acquirer_name='AOL', count(acquirer_name)=64),
 Row(acquirer_name='EMC', count(acquirer_name)=63),
 Row(acquirer_name='Intel', count(acquirer_name)=59)]

# Examination of the investments data

In [None]:
display(dfinv)

DataFrame[company_permalink: string, company_name: string, company_category_list: string, company_market: string, company_country_code: string, company_state_code: string, company_region: string, company_city: string, investor_permalink: string, investor_name: string, investor_category_list: string, investor_market: string, investor_country_code: string, investor_state_code: string, investor_region: string, investor_city: string, funding_round_permalink: string, funding_round_type: string, funding_round_code: string, funded_at: string, funded_month: string, funded_quarter: string, funded_year: string, raised_amount_usd: string]

In [None]:
dfinv.count()

114506

### Location of the most investors - city wise

In [None]:
investorcities = dfinv.\
  select("investor_city",).\
  groupby(col("investor_city")).\
  agg(fn.count("investor_city")).\
  sort("count(investor_city)", ascending=False)

In [None]:
#Displaying top 10 cities
display(investorcities.take(10))

[Row(investor_city='Menlo Park', count(investor_city)=9962),
 Row(investor_city='New York', count(investor_city)=7030),
 Row(investor_city='San Francisco', count(investor_city)=5934),
 Row(investor_city='Palo Alto', count(investor_city)=5224),
 Row(investor_city='London', count(investor_city)=3146),
 Row(investor_city='Boston', count(investor_city)=2637),
 Row(investor_city='Mountain View', count(investor_city)=1786),
 Row(investor_city='Cambridge', count(investor_city)=1761),
 Row(investor_city='Paris', count(investor_city)=1708),
 Row(investor_city='Chicago', count(investor_city)=1261)]

### Number of distinct investors

In [None]:
dist_inv = dfinv.\
   select("investor_permalink").\
   distinct()

In [None]:
dist_inv.count()

22277

### Most invested - By year

In [None]:
mon_year = dfinv.\
   select("funded_year", "raised_amount_usd").\
   groupby("funded_year").\
   agg(fn.sum("raised_amount_usd")).\
   sort("funded_year")

In [None]:
display(mon_year.take(10))

[Row(funded_year='1921', sum(raised_amount_usd)=None),
 Row(funded_year='1974', sum(raised_amount_usd)=None),
 Row(funded_year='1979', sum(raised_amount_usd)=None),
 Row(funded_year='1982', sum(raised_amount_usd)=None),
 Row(funded_year='1983', sum(raised_amount_usd)=None),
 Row(funded_year='1984', sum(raised_amount_usd)=None),
 Row(funded_year='1985', sum(raised_amount_usd)=None),
 Row(funded_year='1986', sum(raised_amount_usd)=None),
 Row(funded_year='1987', sum(raised_amount_usd)=None),
 Row(funded_year='1988', sum(raised_amount_usd)=None)]

# Examinations of rounds data

In [None]:
display(dfrou)

DataFrame[company_permalink: string, company_name: string, company_category_list: string, company_market: string, company_country_code: string, company_state_code: string, company_region: string, company_city: string, funding_round_permalink: string, funding_round_type: string, funding_round_code: string, funded_at: string, funded_month: string, funded_quarter: string, funded_year: string, raised_amount_usd: string]

In [None]:
dfrou.count()

83870

### Top funding Round Types

In [None]:
top_round_typ = dfrou.\
   select("funding_round_type").\
   groupby("funding_round_type").\
   agg(fn.count("funding_round_type")).\
   sort("count(funding_round_type)",ascending = False)

In [None]:
display(top_round_typ.take(10))

[Row(funding_round_type='venture', count(funding_round_type)=41742),
 Row(funding_round_type='seed', count(funding_round_type)=21036),
 Row(funding_round_type='debt_financing', count(funding_round_type)=5692),
 Row(funding_round_type='angel', count(funding_round_type)=4443),
 Row(funding_round_type='undisclosed', count(funding_round_type)=3871),
 Row(funding_round_type='equity_crowdfunding', count(funding_round_type)=2256),
 Row(funding_round_type='private_equity', count(funding_round_type)=1828),
 Row(funding_round_type='grant', count(funding_round_type)=1476),
 Row(funding_round_type='convertible_note', count(funding_round_type)=759),
 Row(funding_round_type='post_ipo_equity', count(funding_round_type)=395)]

# Creating the Master dataset

In [None]:
# Replace + with | , see Hardware+Software
df_categories_pd = dfcom.select("category_list").toPandas()
k = []
for i in df_categories_pd.category_list:
    i = str(i)
    j = i.replace(' + ','|')
    k.append(j)

In [None]:
from pyspark.sql import SQLContext
sql = SQLContext(spark)



In [None]:
# Create Spark DF with new Category list
mylist_pd_k = pd.DataFrame({"category_list": k})
df_cat_k = sql.createDataFrame(mylist_pd_k)

In [None]:
permalink_pd = dfcom.select("permalink").toPandas()

In [None]:
df_categories_pd = mylist_pd_k

mylist = []
for i in range(len(df_categories_pd)):  
  mystring = str(df_categories_pd.category_list[i])
  myremover = mystring.split("|")
  #str_list = filter(None, myremover)
  mylist.append(myremover)

mylist_pd = pd.DataFrame({"category": mylist, "permalink": permalink_pd.permalink})
df_cat = sql.createDataFrame(mylist_pd)

# counter vector + kmeans clustering + fitting and transforming

In [None]:
cv = feature.CountVectorizer(inputCol='category', outputCol='tf')
kmeans = clustering.KMeans(k=400, featuresCol='tf', predictionCol='kmeans_feat')
pipeline_model = Pipeline(stages=[cv, kmeans]).fit(df_cat)
df_catid_kmeans = pipeline_model.transform(df_cat)

# concatenate the categories which are within one kmeans cluster

In [None]:
df_catid_kmeans_concat = df_catid_kmeans.\
groupby("kmeans_feat").\
agg(fn.collect_list(col("category")).alias("category")).\
sort("kmeans_feat")

# Custom Functions

In [None]:
#flatten list spark df to spark df
def flatten_list(df):
  test_pd = df.toPandas()
  mylist1 = []
  klist = []
  for i in range(len(test_pd.kmeans_feat)):
    newlist = list(chain.from_iterable(test_pd.category[i]))
    mylist1.append(newlist)
    klist.append(i)
  transferdf_pd = pd.DataFrame({"kmeans_feat": klist, "category": mylist1})
  newcatewoo = spark.createDataFrame(transferdf_pd)
  return newcatewoo

In [None]:
def three_highest_catpluscount(df):  
  #to pandas
  test1 = df.select("tf").toPandas()
  maxi = []
  maxv = [0,0,0]
  indexlist = []
  valuelist = []
  kmeanlist = []
  for a in range(len(test1.tf)):
    for i in range(len(test1.tf[a])):
      if (min(maxv) < test1.tf[a][i]):
        maxv[0] = test1.tf[a][i]
        maxv = sorted(maxv)
    testarr = test1.tf[a].toArray()
    for n in range(2, -1, -1):
      index = np.where(testarr == maxv[n])
      maxi.append(index[0][0])
      valuelist.append(maxv[n])
      kmeanlist.append(a)
    maxv = [0,0,0]
  kmeancate_pd = pd.DataFrame({"categoryindex": maxi, "kmean_feat": kmeanlist, "count": valuelist})
  kmeancate = spark.createDataFrame(kmeancate_pd)
  return kmeancate

In [None]:
# add categories to index
def add_category_words(df):
  kmeancate_pd = df.toPandas()
  mylisti = []
  for t in range(len(kmeancate_pd)):
    ind = kmeancate_pd.categoryindex[t]
    mylisti.append(wordvector_catwoo[ind])
  kmeancate_pd["category"] = mylisti
  kmeancate_words = spark.createDataFrame(kmeancate_pd)
  return kmeancate_words

In [None]:
def create_final_category(df):
  kmeancate_pd = df.toPandas()
  multiplecate = []
  categorylist = []
  kmeanlist = []
  for o in range(0,len(kmeancate_pd),3):
    multiplecate.append(kmeancate_pd["category"][o])
    if((kmeancate_pd["count"][o]*0.7 <= kmeancate_pd["count"][o+1]) and (kmeancate_pd["category"][o] != kmeancate_pd["category"][o+1])):
      multiplecate.append(kmeancate_pd["category"][o+1])
      if((kmeancate_pd["count"][o+1]*0.7 <= kmeancate_pd["count"][o+2]) and (kmeancate_pd["category"][o+1] != kmeancate_pd["category"][o+2])):
        multiplecate.append(kmeancate_pd["category"][o+2])
    categorystring = " + ".join(multiplecate)
    categorylist.append(categorystring)
    kmeanlist.append(o/3)
    multiplecate = []
  final_pd = pd.DataFrame({"category_final": categorylist, "kmean_feat": kmeanlist})
  final = spark.createDataFrame(final_pd)
  return final

In [None]:
def blank_as_null(x):
    return when(col(x) != "", col(x)).otherwise(None)

In [None]:
def two_highest_catpluscount(df):  
  #to pandas
  test1 = df.toPandas()
  maxi = []
  maxv = [0]
  indexlist = []
  valuelist = []
  kmeanlist = []
  for a in range(len(test1.tf)):
    for i in range(len(test1.tf[a])):
      if (min(maxv) < test1.tf[a][i]):
        maxv[0] = test1.tf[a][i]
        maxv = sorted(maxv)
    testarr = test1.tf[a].toArray()
    for n in range(1):
      index = np.where(testarr == maxv[n])
      maxi.append(index[0][0])
      valuelist.append(maxv[n])
      kmeanlist.append(test1.permalink[a])
    maxv = [0]
  kmeancate_pd = pd.DataFrame({"categoryindex": maxi, "perma": kmeanlist, "count": valuelist})
  kmeancate = spark.createDataFrame(kmeancate_pd)
  return kmeancate

In [None]:
# add categories to index
def add_inv_words(df):
  kmeancate_pd = df.toPandas()
  mylisti = []
  for t in range(len(kmeancate_pd)):
    ind = kmeancate_pd.categoryindex[t]
    mylisti.append(inv_words[ind])
  kmeancate_pd["category"] = mylisti
  kmeancate_words = spark.createDataFrame(kmeancate_pd)
  return kmeancate_words

In [None]:
# add categories to index
def add_round_words(df):
  kmeancate_pd = df.toPandas()
  mylisti = []
  for t in range(len(kmeancate_pd)):
    ind = kmeancate_pd.categoryindex[t]
    mylisti.append(round_words[ind])
  kmeancate_pd["category"] = mylisti
  kmeancate_words = spark.createDataFrame(kmeancate_pd)
  return kmeancate_words

In [None]:
#flatten list spark df to spark df
newcatewoo = flatten_list(df_catid_kmeans_concat)

In [None]:
# counter vectorizer
cv = feature.CountVectorizer(inputCol='category', outputCol='tf')
# cv fitting and df transformation
cv_model = cv.fit(newcatewoo)
df_catwoo_cv = cv_model.transform(newcatewoo)

In [None]:
#get the three categories occuring most often for each cluster
kmeancate = three_highest_catpluscount(df_catwoo_cv)

In [None]:
wordvector_catwoo = cv_model.vocabulary
wordvector_catwoo

['',
 'None',
 'Software',
 'Mobile',
 'Biotechnology',
 'E-Commerce',
 'Curated Web',
 'Social Media',
 'Enterprise Software',
 'Advertising',
 'Games',
 'Hardware',
 'Health Care',
 'Finance',
 'Education',
 'Clean Technology',
 'Analytics',
 'Health and Wellness',
 'SaaS',
 'Internet',
 'Apps',
 'Manufacturing',
 'Security',
 'Video',
 'Fashion',
 'Search',
 'Consulting',
 'Travel',
 'News',
 'Cloud Computing',
 'Web Hosting',
 'Hospitality',
 'Music',
 'Networking',
 'Technology',
 'Social Network Media',
 'Entertainment',
 'Sales and Marketing',
 'Real Estate',
 'Marketplaces',
 'Messaging',
 'Semiconductors',
 'Big Data',
 'Sports',
 'Photography',
 'iPhone',
 'Media',
 'Android',
 'Startups',
 'Medical',
 'Web Development',
 'Retail',
 'Design',
 'Automotive',
 'Content',
 'Events',
 'Location Based Services',
 'Digital Media',
 'Enterprises',
 'Facebook Applications',
 'Collaboration',
 'Reviews and Recommendations',
 'Publishing',
 'Payments',
 'Information Technology',
 'iOS'

In [None]:
# add categories to index
kmeancate_words = add_category_words(kmeancate)

In [None]:
#create final representative category value for each cluster
final = create_final_category(kmeancate_words)

In [None]:
display(
  final.\
  groupby("category_final").\
  agg(fn.count("category_final").alias("count")).\
  filter(col("count") > 1).\
  sort(col("count").desc())
)

DataFrame[category_final: string, count: bigint]

In [None]:
#look for permalinks that occur twice 
display(dfcom.select("permalink").groupBy("permalink").agg(fn.count("permalink")).where(fn.count("permalink") == 2))

DataFrame[permalink: string, count(permalink): bigint]

In [None]:
#investigate duplicates
display(dfcom.filter((col("permalink") == "/organization/prysm")|(col("permalink") == "/organization/treasure-valley-urology-services")))

DataFrame[permalink: string, name: string, homepage_url: string, category_list: string, market: string, funding_total_usd: string, status: string, country_code: string, state_code: string, region: string, city: string, funding_rounds: string, founded_at: string, founded_month: string, founded_quarter: string, founded_year: string, first_funding_at: string, last_funding_at: string]

# Remove duplicates with less information

In [None]:
#select observation with less information of duplicate 1
dfcomsubtr1 = dfcom.filter((col("permalink") == "/organization/prysm") & ((col("permalink") == "/organization/prysm") & (col("funding_rounds") == 1)))

In [None]:
#select observation with less information of duplicate 2
dfcomsubtr2 = dfcom.filter((col("permalink") == "/organization/treasure-valley-urology-services") & ((col("permalink") == "/organization/treasure-valley-urology-services") & (col("funding_rounds") == 1)))

In [None]:
#get rid of duplicates (deprecated entries)
dfcom = dfcom.subtract(dfcomsubtr1).subtract(dfcomsubtr2)

In [None]:
#double check if preparation worked out
display(dfcom.select("permalink").groupBy("permalink").agg(fn.count("permalink")).sort("count(permalink)", ascending=False))

DataFrame[permalink: string, count(permalink): bigint]

# define calculations for aggregation when joining the spreadsheets

In [None]:
#calculate the age of a company based on year of founding, the appropriate quarter and 2015, which is the date when the data set was created
age_calc = fn.when(col("quarter_new") == "Q2", 2015 - col("founded_year") - 0.25).\
  otherwise(fn.when(col("quarter_new") == "Q3", 2015 - col("founded_year") - 0.5).\
            otherwise(fn.when(col("quarter_new") == "Q4", 2015 - col("founded_year") - 0.75).\
                      otherwise(2015 - col("founded_year"))))

In [None]:
#calculate the time to funding of a company based on date of funding and date of data set creation 
time_to_funding_calc = fn.when(col("funded_quarter_new") == "Q2", col("funded_year") + 0.25 - (2015 - col("age"))).\
  otherwise(fn.when(col("funded_quarter_new") == "Q3", col("funded_year") + 0.5 - (2015 - col("age"))).\
            otherwise(fn.when(col("funded_quarter_new") == "Q4", col("funded_year") + 0.75 - (2015 - col("age"))).\
                      otherwise(col("funded_year") - (2015 - col("age")))))

# Defining subsets before joining

In [None]:
#select subsets before joining to exclude information/observations which will not be used
dfcomsub = dfcom.\
  withColumn("quarter_new", col("founded_quarter").substr(6,2)).\
  withColumn("age", age_calc).\
  select("permalink", "name", "market", "funding_total_usd", "status", "country_code", "city", "funding_rounds", "founded_year", "quarter_new","age")

dfinvsub = dfinv.\
  withColumn("funded_quarter_new", col("funded_quarter").substr(6,2)).\
  select("company_permalink", "investor_permalink", "investor_name", "investor_country_code", "funding_round_type", "funded_quarter_new", "funded_year", "raised_amount_usd")

# Join Companies and investment data

In [None]:
#multiple rows in dfinv for each permalink
dfmaster = dfcomsub.join(dfinvsub, dfcomsub["permalink"] == dfinvsub["company_permalink"], 'leftouter')
dfmaster2 = dfmaster.\
  withColumn("time_to_funding", time_to_funding_calc)

In [None]:
# Aggregate multiple entries of each company 
dfmaster2_agg = dfmaster2.\
  groupby(col("permalink").alias("permalink_agg")).\
  agg(fn.count("investor_permalink").alias("count_investor"),
      fn.min("time_to_funding").alias("time_to_first_funding"),
      fn.concat_ws(", ", fn.collect_list(col("investor_country_code"))).alias("investor_country_codes"),
      fn.concat_ws(", ", fn.collect_list(col("funding_round_type"))).alias("funding_round_types"),
      fn.sum("raised_amount_usd").alias("total_raised_usd"))

In [None]:
dfmaster2_agg.count()

49437

# join aggregated values to companies subset dataframe

In [None]:
dfmaster_final = dfcomsub.join(dfmaster2_agg, dfcomsub["permalink"] == dfmaster2_agg["permalink_agg"], 'leftouter')

In [None]:
display(dfmaster_final)

DataFrame[permalink: string, name: string, market: string, funding_total_usd: string, status: string, country_code: string, city: string, funding_rounds: string, founded_year: string, quarter_new: string, age: double, permalink_agg: string, count_investor: bigint, time_to_first_funding: double, investor_country_codes: string, funding_round_types: string, total_raised_usd: double]

In [None]:
dfmaster_final.count()

49437

In [None]:
#examination of observations with target variable acquired
display(dfmaster.where(col("status") == "acquired"))

DataFrame[permalink: string, name: string, market: string, funding_total_usd: string, status: string, country_code: string, city: string, funding_rounds: string, founded_year: string, quarter_new: string, age: double, company_permalink: string, investor_permalink: string, investor_name: string, investor_country_code: string, funding_round_type: string, funded_quarter_new: string, funded_year: string, raised_amount_usd: string]

In [None]:
#drop column which exists twice with different name
dfmaster3 = dfmaster2.drop("company_permalink")

# investigate quarter of founding and funds

In [None]:
#summarize raised amount of USD for each quarter
investperquarter = dfmaster3.\
select("quarter_new", "raised_amount_usd").\
groupby("quarter_new").\
agg(fn.sum("raised_amount_usd")).\
sort("quarter_new")

In [None]:
investperquarternona = investperquarter.na.drop()

In [None]:
#companies which were founded in quarter 1 tend to get more funds than others
display(investperquarternona)

DataFrame[quarter_new: string, sum(raised_amount_usd): double]

# join custom category column to dataframe of companies and investments data

In [None]:
#select permalink, category and kmeans_feat
df_join1 = df_catid_kmeans.\
  select("permalink", "category", "kmeans_feat")

In [None]:
#join to final custom category df
df_final_permas = df_join1.join(final, df_join1["kmeans_feat"] == final["kmean_feat"], 'leftouter')

In [None]:
display(df_final_permas)

DataFrame[permalink: string, category: array<string>, kmeans_feat: int, category_final: string, kmean_feat: double]

In [None]:
#select subset and rename permalink column to ensure unique column names
df_final_permas_sub = df_final_permas.\
  select(col("permalink").alias("permalink_sub"),"category_final")

In [None]:
#join custom categories to master df which is the one resulting from joining companies and investment spreadsheet (inlcuding aggregation)
df_master_final_cate = dfmaster_final.join(df_final_permas_sub, dfmaster_final["permalink"] == df_final_permas_sub["permalink_sub"], 'leftouter')

In [None]:
display(df_master_final_cate)

DataFrame[permalink: string, name: string, market: string, funding_total_usd: string, status: string, country_code: string, city: string, funding_rounds: string, founded_year: string, quarter_new: string, age: double, permalink_agg: string, count_investor: bigint, time_to_first_funding: double, investor_country_codes: string, funding_round_types: string, total_raised_usd: double, permalink_sub: string, category_final: string]

# get majority of investor_country_codes and funding_round_types to reduce complexity

In [None]:
#select subset of appropriate columns
dfmastermajority = df_master_final_cate.select("permalink", "investor_country_codes", "funding_round_types")

In [None]:
#the blank strings have to get converted to NULL for further functions
dfmajority = dfmastermajority.\
  withColumn("investor_country_codes", blank_as_null("investor_country_codes")).\
  withColumn("funding_round_types", blank_as_null("funding_round_types"))

In [None]:
#write investor_country_codes string and funding_round_types strings that a separated by commas into a list
majority = dfmajority.\
  withColumn("investor_country_codes", split(col("investor_country_codes"), ",\s*")).\
  withColumn("funding_round_types", split(col("funding_round_types"), ",\s*"))

In [None]:
#dropping of rows with NULL values
majoritydropinv = majority.select("permalink","investor_country_codes").na.drop()
majoritydropround = majority.select("permalink","funding_round_types").na.drop()

In [None]:
# Counter vectorizing the investor_country_codes and funding_round_types feature in order to use the vector to calculate the majority count for each observation

cv_inv = feature.CountVectorizer(inputCol='investor_country_codes', outputCol='tf')

cv_round = feature.CountVectorizer(inputCol='funding_round_types', outputCol='tf')


cv_inv_model = cv_inv.fit(majoritydropinv)
df_cv_inv = cv_inv_model.transform(majoritydropinv)

cv_round_model = cv_round.fit(majoritydropround)
df_cv_round = cv_round_model.transform(majoritydropround)

In [None]:
#calculate the two highest counts of the investor_country_code for each company 
invtwo = two_highest_catpluscount(df_cv_inv)

In [None]:
#assign the words of the counter vectorizer of the investor_country_codes
inv_words = cv_inv_model.vocabulary

In [None]:
#assign the corresponding investor_country_codes to the counts for each company
invplus_words = add_inv_words(invtwo)

In [None]:
#calculate the two highest counts of the funding_round_types for each company 
roundtwo = two_highest_catpluscount(df_cv_round)

In [None]:
#assign the corresponding funding_round_types to the counts for each company
round_words = cv_round_model.vocabulary

In [None]:
#assign the corresponding funding_round_types to the counts for each company
roundpluswords = add_round_words(roundtwo)

In [None]:
#join the "majority" investor_country_codes to the master table
masternew = df_master_final_cate.join(invplus_words.select("perma",col("category").alias("investor_country_code")), df_master_final_cate["permalink"] == invplus_words["perma"], 'leftouter')

In [None]:
#selection in order to rename and for better joining
roundpluswords = roundpluswords.select(col("perma").alias("permaround"),col("category").alias("funding_round_type"))

In [None]:
#join the "majority" funding_round_types to the master table
masternew = masternew.join(roundpluswords, masternew["permalink"] == roundpluswords["permaround"], 'leftouter')

In [None]:
display(masternew)

DataFrame[permalink: string, name: string, market: string, funding_total_usd: string, status: string, country_code: string, city: string, funding_rounds: string, founded_year: string, quarter_new: string, age: double, permalink_agg: string, count_investor: bigint, time_to_first_funding: double, investor_country_codes: string, funding_round_types: string, total_raised_usd: double, permalink_sub: string, category_final: string, perma: string, investor_country_code: string, permaround: string, funding_round_type: string]

In [None]:
#this dataframe was exported and then imported in another notebook 
#the entire project is split into two notebooks

masterdropped = masternew.drop("funding_total_usd", "permalink_agg", "investor_country_codes", "funding_round_types", "permalink_sub", "perma", "permaround")

In [None]:
masterdropped.count()

49445

In [None]:
display(masterdropped)

DataFrame[permalink: string, name: string, market: string, status: string, country_code: string, city: string, funding_rounds: string, founded_year: string, quarter_new: string, age: double, count_investor: bigint, time_to_first_funding: double, total_raised_usd: double, category_final: string, investor_country_code: string, funding_round_type: string]

# create binary value for target variable

In [None]:
#create new column with 1 or 0 depending on value of target variable: acquired = 1, otherwise 0
finaltarget = masterdropped.\
  withColumn("label", fn.when(col("status") == "acquired",1).otherwise(0))

In [None]:
display(finaltarget)

DataFrame[permalink: string, name: string, market: string, status: string, country_code: string, city: string, funding_rounds: string, founded_year: string, quarter_new: string, age: double, count_investor: bigint, time_to_first_funding: double, total_raised_usd: double, category_final: string, investor_country_code: string, funding_round_type: string, label: int]

# drop missing values

In [None]:
#create subset of dataframe without missing values
finalwithoutna = finaltarget.na.drop()

In [None]:
masternew.toPandas().to_csv('master.csv')