# Spark Test


In [1]:
import os
os.getenv("de")

In [2]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

In [3]:
spark=SparkSession.builder.master("local").appName("de").getOrCreate()
conf=SparkConf().setMaster("local").setAppName("de")
sc=SparkContext.getOrCreate()

In [4]:
spark

* Load Parquet file and CSV file

In [5]:
consumerinternParaq = spark.read.parquet("consumerInternet.parquet")

In [6]:
print(consumerinternParaq.count(), len(consumerinternParaq.columns))

941 10


In [7]:
consumerinternParaq.show(5, truncate=True)

+-----+----------+------------+-----------------+--------------------+---------+--------------------+---------------+-------------+-------+
|Sr_No|      Date|Startup_Name|Industry_Vertical|         SubVertical|     City|      Investors_Name|InvestmentnType|Amount_in_USD|Remarks|
+-----+----------+------------+-----------------+--------------------+---------+--------------------+---------------+-------------+-------+
|  152|01/09/2018|     Netmeds|Consumer Internet|Online Pharmacy C...|  Chennai|Sistema Asia Fund...| Private Equity|  3,50,00,000|    nan|
|  154|03/09/2018|  Daily hunt|Consumer Internet|News and ebooks M...|Bengaluru|         Falcon Edge| Private Equity|    63,90,000|    nan|
|  156|04/09/2018| HappyGoEasy|Consumer Internet| Online Travel Agecy| Gurugram|Korea Investment ...| Private Equity|          N/A|    nan|
|  157|05/09/2018|       Nykaa|Consumer Internet|Online Marketplac...|   Mumbai|          Lighthouse| Private Equity| 15,72,00,000|    nan|
|  159|06/09/2018|  

In [8]:
consumerinternParaq.printSchema()

root
 |-- Sr_No: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Startup_Name: string (nullable = true)
 |-- Industry_Vertical: string (nullable = true)
 |-- SubVertical: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Investors_Name: string (nullable = true)
 |-- InvestmentnType: string (nullable = true)
 |-- Amount_in_USD: string (nullable = true)
 |-- Remarks: string (nullable = true)



In [9]:
startupFPath="startup.csv"
startupF=spark.read.format("csv").option("inferSchema", "true").option("header", "true").load(startupFPath)

In [10]:
print(startupF.count(), len(startupF.columns))

2103 10


In [11]:
startupF.show(5, truncate=True)

+-----+----------+--------------------+-------------------+--------------------+---------+--------------------+--------------------+-------------+-------+
|Sr_No|      Date|        Startup_Name|  Industry_Vertical|         SubVertical|     City|      Investors_Name|     InvestmentnType|Amount_in_USD|Remarks|
+-----+----------+--------------------+-------------------+--------------------+---------+--------------------+--------------------+-------------+-------+
|    1|09/01/2020|              BYJU’S|             E-Tech|          E-learning|Bengaluru|Tiger Global Mana...|Private Equity Round| 20,00,00,000|   null|
|    2|13/01/2020|              Shuttl|     Transportation|App based shuttle...|  Gurgaon|Susquehanna Growt...|            Series C|    80,48,394|   null|
|    3|09/01/2020|           Mamaearth|         E-commerce|Retailer of baby ...|Bengaluru|Sequoia Capital I...|            Series B|  1,83,58,860|   null|
|    4|02/01/2020|https://www.wealt...|            FinTech|   Online I

In [12]:
startupF.printSchema()

root
 |-- Sr_No: integer (nullable = true)
 |-- Date: string (nullable = true)
 |-- Startup_Name: string (nullable = true)
 |-- Industry_Vertical: string (nullable = true)
 |-- SubVertical: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Investors_Name: string (nullable = true)
 |-- InvestmentnType: string (nullable = true)
 |-- Amount_in_USD: string (nullable = true)
 |-- Remarks: string (nullable = true)



## 1. Combine the Consumer Itnernet and Startup dataframes

In [13]:
dfconsumerinternstartup=consumerinternParaq.union(startupF)

In [14]:
print(dfconsumerinternstartup.count(), len(dfconsumerinternstartup.columns))

3044 10


In [15]:
dfconsumerinternstartup.filter(dfconsumerinternstartup.City == 'Pune').show(5,truncate=False)

+-----+----------+------------+-----------------+---------------------------------------+----+------------------------------------------------------+--------------------+-------------+-------+
|Sr_No|Date      |Startup_Name|Industry_Vertical|SubVertical                            |City|Investors_Name                                        |InvestmentnType     |Amount_in_USD|Remarks|
+-----+----------+------------+-----------------+---------------------------------------+----+------------------------------------------------------+--------------------+-------------+-------+
|215  |24/07/2018|Digit       |Consumer Internet|Online Insurance Startup               |Pune|Fairfax                                               |Private Equity      |4,50,00,000  |nan    |
|263  |11/05/2018|Earth Food  |Consumer Internet|Agri-tech                              |Pune|Rairah Corporation                                    |Seed / Angel Funding|9,49,000     |nan    |
|325  |04/03/2018|Livehealth  |Cons

## Ans A: Total Startups is Pune City are

In [16]:
startupInPune=dfconsumerinternstartup.filter(dfconsumerinternstartup.City == 'Pune').count()

In [17]:
startupInPune

105

## Ans B: Total number of startups who get funds for Seed and Angel Investment Type

In [18]:
dfconsumerinternstartup.filter(dfconsumerinternstartup.InvestmentnType == 'Seed' ).count()

4

In [19]:
dfconsumerinternstartup.filter(dfconsumerinternstartup.InvestmentnType == 'Angel' ).count()

1

In [20]:
dfconsumerinternstartup.filter( (dfconsumerinternstartup.InvestmentnType == 'Seed') | (dfconsumerinternstartup.InvestmentnType == 'Angel') ).count()

5

## Ans C: Amout Raised by startups in Pune

* First chek there are any records with null or None values   (watchou there are N/A non standard Null values)

In [21]:
punestartups=dfconsumerinternstartup.filter(dfconsumerinternstartup.City == 'Pune')

In [22]:
punestartups.show(5,truncate=False)

+-----+----------+------------+-----------------+---------------------------------------+----+------------------------------------------------------+--------------------+-------------+-------+
|Sr_No|Date      |Startup_Name|Industry_Vertical|SubVertical                            |City|Investors_Name                                        |InvestmentnType     |Amount_in_USD|Remarks|
+-----+----------+------------+-----------------+---------------------------------------+----+------------------------------------------------------+--------------------+-------------+-------+
|215  |24/07/2018|Digit       |Consumer Internet|Online Insurance Startup               |Pune|Fairfax                                               |Private Equity      |4,50,00,000  |nan    |
|263  |11/05/2018|Earth Food  |Consumer Internet|Agri-tech                              |Pune|Rairah Corporation                                    |Seed / Angel Funding|9,49,000     |nan    |
|325  |04/03/2018|Livehealth  |Cons

* Remove the commas and NAs from Amount_in_USD column

In [23]:
from pyspark.sql.functions import *

punestartups1=punestartups.withColumn('Amount_in_USD', regexp_replace('Amount_in_USD',',',''))
# punestartups1.show(truncate=False)


punestartups2=punestartups1.withColumn('Amount_in_USD', regexp_replace('Amount_in_USD','N/A','0'))
punestartups2.show(5, truncate=False)

+-----+----------+------------+-----------------+---------------------------------------+----+------------------------------------------------------+--------------------+-------------+-------+
|Sr_No|Date      |Startup_Name|Industry_Vertical|SubVertical                            |City|Investors_Name                                        |InvestmentnType     |Amount_in_USD|Remarks|
+-----+----------+------------+-----------------+---------------------------------------+----+------------------------------------------------------+--------------------+-------------+-------+
|215  |24/07/2018|Digit       |Consumer Internet|Online Insurance Startup               |Pune|Fairfax                                               |Private Equity      |45000000     |nan    |
|263  |11/05/2018|Earth Food  |Consumer Internet|Agri-tech                              |Pune|Rairah Corporation                                    |Seed / Angel Funding|949000       |nan    |
|325  |04/03/2018|Livehealth  |Cons

* Total amount raised by startup in Pune city

In [24]:
import pyspark.sql.functions as F     

punestartups2.agg(F.sum("Amount_in_USD")).collect()[0][0]

633082000.0

## Ans D: Top 5 Industry_Verticle startups

In [25]:
industry_verticle_startups=dfconsumerinternstartup.select('Startup_Name','Industry_Vertical')

In [26]:
industry_verticle_startups.groupBy('Industry_Vertical').count().orderBy(col('count').desc()).show(5,truncate=False)

+-----------------+-----+
|Industry_Vertical|count|
+-----------------+-----+
|Consumer Internet|941  |
|Technology       |478  |
|eCommerce        |186  |
|nan              |171  |
|Healthcare       |70   |
+-----------------+-----+
only showing top 5 rows



## Ans E: Top Investor each year

In [27]:
dfconsumerinternstartup.show(5,truncate=False)

+-----+----------+------------+-----------------+------------------------------------------------------------------------+---------+-------------------------------------------------------------------------------------+---------------+-------------+-------+
|Sr_No|Date      |Startup_Name|Industry_Vertical|SubVertical                                                             |City     |Investors_Name                                                                       |InvestmentnType|Amount_in_USD|Remarks|
+-----+----------+------------+-----------------+------------------------------------------------------------------------+---------+-------------------------------------------------------------------------------------+---------------+-------------+-------+
|152  |01/09/2018|Netmeds     |Consumer Internet|Online Pharmacy Chain                                                   |Chennai  |Sistema Asia Fund, Sistema JSFC and Tanncam Investment                               |Private Equ

* Note: Date column is not of type Date (it is string)

In [28]:
dfconsumerinternstartup.printSchema()

root
 |-- Sr_No: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Startup_Name: string (nullable = true)
 |-- Industry_Vertical: string (nullable = true)
 |-- SubVertical: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Investors_Name: string (nullable = true)
 |-- InvestmentnType: string (nullable = true)
 |-- Amount_in_USD: string (nullable = true)
 |-- Remarks: string (nullable = true)



* Remove the commas and NAs from Amount_in_USD column

In [29]:
from pyspark.sql.functions import *

dfcommaremovedamount=dfconsumerinternstartup.withColumn('Amount_in_USD', regexp_replace('Amount_in_USD',',',''))



dfnaremovedamount=dfcommaremovedamount.withColumn('Amount_in_USD', regexp_replace('Amount_in_USD','N/A','0'))
dfnaremovedamount.show(5, truncate=False)

+-----+----------+------------+-----------------+------------------------------------------------------------------------+---------+-------------------------------------------------------------------------------------+---------------+-------------+-------+
|Sr_No|Date      |Startup_Name|Industry_Vertical|SubVertical                                                             |City     |Investors_Name                                                                       |InvestmentnType|Amount_in_USD|Remarks|
+-----+----------+------------+-----------------+------------------------------------------------------------------------+---------+-------------------------------------------------------------------------------------+---------------+-------------+-------+
|152  |01/09/2018|Netmeds     |Consumer Internet|Online Pharmacy Chain                                                   |Chennai  |Sistema Asia Fund, Sistema JSFC and Tanncam Investment                               |Private Equ

* Note: Convert Date column into Only Year column

In [30]:
dfonlyyear = (dfnaremovedamount.withColumn('Date', substring('Date', -4, 4)  ))
dfonlyyear.show(5, truncate=False)

+-----+----+------------+-----------------+------------------------------------------------------------------------+---------+-------------------------------------------------------------------------------------+---------------+-------------+-------+
|Sr_No|Date|Startup_Name|Industry_Vertical|SubVertical                                                             |City     |Investors_Name                                                                       |InvestmentnType|Amount_in_USD|Remarks|
+-----+----+------------+-----------------+------------------------------------------------------------------------+---------+-------------------------------------------------------------------------------------+---------------+-------------+-------+
|152  |2018|Netmeds     |Consumer Internet|Online Pharmacy Chain                                                   |Chennai  |Sistema Asia Fund, Sistema JSFC and Tanncam Investment                               |Private Equity |35000000     |nan  

In [31]:
from pyspark.sql.types import *

dfonlyyearamoutnum=dfonlyyear.withColumn(  'Amount_in_USD',   dfonlyyear.Amount_in_USD.cast(LongType())  )

In [32]:
dfonlyyearamoutnum.printSchema()

root
 |-- Sr_No: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Startup_Name: string (nullable = true)
 |-- Industry_Vertical: string (nullable = true)
 |-- SubVertical: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Investors_Name: string (nullable = true)
 |-- InvestmentnType: string (nullable = true)
 |-- Amount_in_USD: long (nullable = true)
 |-- Remarks: string (nullable = true)



In [33]:
dfmaxamountofyear=dfonlyyearamoutnum.groupBy('Date').max('Amount_in_USD')

In [34]:
dfmaxamountofyear.show(truncate=False)

+----+------------------+
|Date|max(Amount_in_USD)|
+----+------------------+
|2016|200000000         |
|2020|200000000         |
|2019|3900000000        |
|2017|2500000000        |
|/015|630000            |
|2018|600000000         |
|2015|700000000         |
+----+------------------+



In [35]:
top_investor_yearwise =dfmaxamountofyear.join(dfonlyyear, dfmaxamountofyear.Date == dfonlyyear.Date).show(2)

+----+------------------+-----+----+------------+-----------------+--------------------+---------+--------------------+---------------+-------------+-------+
|Date|max(Amount_in_USD)|Sr_No|Date|Startup_Name|Industry_Vertical|         SubVertical|     City|      Investors_Name|InvestmentnType|Amount_in_USD|Remarks|
+----+------------------+-----+----+------------+-----------------+--------------------+---------+--------------------+---------------+-------------+-------+
|2018|         600000000|  152|2018|     Netmeds|Consumer Internet|Online Pharmacy C...|  Chennai|Sistema Asia Fund...| Private Equity|     35000000|    nan|
|2018|         600000000|  154|2018|  Daily hunt|Consumer Internet|News and ebooks M...|Bengaluru|         Falcon Edge| Private Equity|      6390000|    nan|
+----+------------------+-----+----+------------+-----------------+--------------------+---------+--------------------+---------------+-------------+-------+
only showing top 2 rows

