# Spark Practice

In [1]:
import os
os.getenv("de")

In [2]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

In [3]:
spark=SparkSession.builder.master("local").appName("de").getOrCreate()
conf=SparkConf().setMaster("local").setAppName("de")
sc=SparkContext.getOrCreate()

In [4]:
spark

* Load Parquet file and CSV file

In [5]:
consumerinternParaq = spark.read.parquet("consumerInternet.parquet")

In [6]:
type(consumerinternParaq)

pyspark.sql.dataframe.DataFrame

In [7]:
print(consumerinternParaq.count(), len(consumerinternParaq.columns))

941 10


In [8]:
consumerinternParaq.show(5, truncate=False)

+-----+----------+------------+-----------------+------------------------------------------------------------------------+---------+-------------------------------------------------------------------------------------+---------------+-------------+-------+
|Sr_No|Date      |Startup_Name|Industry_Vertical|SubVertical                                                             |City     |Investors_Name                                                                       |InvestmentnType|Amount_in_USD|Remarks|
+-----+----------+------------+-----------------+------------------------------------------------------------------------+---------+-------------------------------------------------------------------------------------+---------------+-------------+-------+
|152  |01/09/2018|Netmeds     |Consumer Internet|Online Pharmacy Chain                                                   |Chennai  |Sistema Asia Fund, Sistema JSFC and Tanncam Investment                               |Private Equ

In [9]:
consumerinternParaq.printSchema()

root
 |-- Sr_No: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Startup_Name: string (nullable = true)
 |-- Industry_Vertical: string (nullable = true)
 |-- SubVertical: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Investors_Name: string (nullable = true)
 |-- InvestmentnType: string (nullable = true)
 |-- Amount_in_USD: string (nullable = true)
 |-- Remarks: string (nullable = true)



In [10]:
startupFPath="startup.csv"
startupF=spark.read.format("csv").option("inferSchema", "true").option("header", "true").load(startupFPath)

In [11]:
print(startupF.count(), len(startupF.columns))

2103 10


In [12]:
startupF.show(5, truncate=True)

+-----+----------+--------------------+-------------------+--------------------+---------+--------------------+--------------------+-------------+-------+
|Sr_No|      Date|        Startup_Name|  Industry_Vertical|         SubVertical|     City|      Investors_Name|     InvestmentnType|Amount_in_USD|Remarks|
+-----+----------+--------------------+-------------------+--------------------+---------+--------------------+--------------------+-------------+-------+
|    1|09/01/2020|              BYJU’S|             E-Tech|          E-learning|Bengaluru|Tiger Global Mana...|Private Equity Round| 20,00,00,000|   null|
|    2|13/01/2020|              Shuttl|     Transportation|App based shuttle...|  Gurgaon|Susquehanna Growt...|            Series C|    80,48,394|   null|
|    3|09/01/2020|           Mamaearth|         E-commerce|Retailer of baby ...|Bengaluru|Sequoia Capital I...|            Series B|  1,83,58,860|   null|
|    4|02/01/2020|https://www.wealt...|            FinTech|   Online I

In [13]:
startupF.printSchema()

root
 |-- Sr_No: integer (nullable = true)
 |-- Date: string (nullable = true)
 |-- Startup_Name: string (nullable = true)
 |-- Industry_Vertical: string (nullable = true)
 |-- SubVertical: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Investors_Name: string (nullable = true)
 |-- InvestmentnType: string (nullable = true)
 |-- Amount_in_USD: string (nullable = true)
 |-- Remarks: string (nullable = true)



### Compbine consumer and startup files in SQL tables first

* Convert consumer df in to sql table

In [14]:
## Register the dataframe as a table
consumerinternParaq.createOrReplaceTempView("consumerinternParaq")

In [15]:
countconsumerrecords=spark.sql('''
select count(*) from consumerinternParaq
''')

In [16]:
countconsumerrecords.show(5, truncate=False)

+--------+
|count(1)|
+--------+
|941     |
+--------+



* Convert startup df in to sql table

In [17]:
## Register the dataframe as a table
startupF.createOrReplaceTempView("startupF")

In [18]:
countsratuprecords=spark.sql('''
select count(*) from startupF
''')

In [19]:
countsratuprecords.show(5, truncate=False)

+--------+
|count(1)|
+--------+
|2103    |
+--------+



* Combine two SQL tables by UNION clause

In [20]:
sqlwaydfconsumerinternstartup=spark.sql('''
select * from consumerinternParaq UNION select * from startupF
''')

### Here is the combined table in SQL

In [21]:
sqlwaydfconsumerinternstartup.createOrReplaceTempView('sqlwaydfconsumerinternstartup')

In [22]:
temptotalrecords=spark.sql('''
select count(*) from sqlwaydfconsumerinternstartup
''')

In [23]:
temptotalrecords.show(5, truncate=False)

+--------+
|count(1)|
+--------+
|3044    |
+--------+



### Question Answers

### Ans A: How many startups are there in Pune City?

In [24]:
sqlwayconsumerstartupsinpune=spark.sql('''
select count(*) from sqlwaydfconsumerinternstartup where City='Pune'
''')

In [25]:
sqlwayconsumerstartupsinpune.show(5, truncate=False)

+--------+
|count(1)|
+--------+
|105     |
+--------+



### Ans B: How many startups in Pune got their Seed/ Angel Funding?

In [26]:
sqlwayconsumerstartupsinpuneinvestseedangel=spark.sql('''
select count(*) from sqlwaydfconsumerinternstartup where City='Pune' and InvestmentnType='Seed / Angel Funding'
''')

In [27]:
sqlwayconsumerstartupsinpuneinvestseedangel.show(5, truncate=False)

+--------+
|count(1)|
+--------+
|3       |
+--------+



### Ans C: What is the total amount raised by startups in Pune City? Hint - use regex_replace to get rid of null

* Make sure the data is in appropriate format
1. Amount column has (,) commas in it
2. It also has N/A values

In [28]:
sqlwayremovealpha=spark.sql('''
SELECT sum(REGEXP_REPLACE( Amount_in_USD, '[^0-9]+', '' )) AS PuneStartupRaised FROM sqlwaydfconsumerinternstartup where City='Pune'
''')

In [29]:
# sqlwayremovealpha=spark.sql('''
# SELECT REGEXP_REPLACE( Amount_in_USD, '[^0-9]+', '' ) AS commaremovedamount FROM sqlwaydfconsumerinternstartup where City='Pune'
# ''')

In [30]:
sqlwayremovealpha.show(20, truncate=False)

+-----------------+
|PuneStartupRaised|
+-----------------+
|6.33082E8        |
+-----------------+



### Ans D: What are the top 5 Industry_Vertical which has the highest number of startups in India?

In [31]:
# What are the top 5 Industry_Vertical which has the highest number of startups in India?
topfiv = spark.sql('''select Industry_Vertical, count(Startup_Name) sup_count from sqlwaydfconsumerinternstartup group by Industry_Vertical order by sup_count desc limit 5''')

In [32]:
topfiv.show(5, truncate=False)

+-----------------+---------+
|Industry_Vertical|sup_count|
+-----------------+---------+
|Consumer Internet|941      |
|Technology       |478      |
|eCommerce        |186      |
|nan              |171      |
|Healthcare       |70       |
+-----------------+---------+



### Ans E: Find the top Investor(by amount) of each year

In [33]:
# sqlwaytop5investoreachyear=spark.sql('''
# select SUBSTRING(Date, -4, 4) as yearonly, Amount_in_USD from sqlwaydfconsumerinternstartup
# ''')

In [34]:
# sqlwaytop5investoreachyear=spark.sql('''
# select Investors_Name, SUBSTRING(Date, -4, 4) as yearonly from sqlwaydfconsumerinternstartup
# ''')

In [35]:
sqlwayinvyearamount=spark.sql('''
SELECT Investors_Name, SUBSTRING(Date, -4, 4) as yearonly, REGEXP_REPLACE( Amount_in_USD, '[^0-9]+', '' ) AS amountz  FROM sqlwaydfconsumerinternstartup
''')

In [36]:
sqlwayinvyearamount.show(20, truncate=False)

+------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+--------+
|Investors_Name                                                                                                                                                    |yearonly|amountz |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+--------+
|N/A                                                                                                                                                               |2017    |        |
|Alex Chua, Rahul Garg                                                                                                                                             |2016    |        |
|Undisclosed Investor                                                                

In [37]:
sqlwayinvyearamount.createOrReplaceTempView('sqlwayinvyearamount')

In [38]:
sqlwayinvyearamount.printSchema()

root
 |-- Investors_Name: string (nullable = true)
 |-- yearonly: string (nullable = true)
 |-- amountz: string (nullable = true)



In [39]:
sqlwayinvyearamount.show(5, truncate=False)

+------------------------------------+--------+-------+
|Investors_Name                      |yearonly|amountz|
+------------------------------------+--------+-------+
|N/A                                 |2017    |       |
|Alex Chua, Rahul Garg               |2016    |       |
|Undisclosed Investor                |2016    |75000  |
|Dheeraj Jain & Other angel investors|2016    |400000 |
|Aqeel Ahmed                         |2016    |       |
+------------------------------------+--------+-------+
only showing top 5 rows



In [40]:
sqlwaytop5investor1=spark.sql('''
select yearonly, CAST(max(amountz) AS LONG) as maxinvestofyear from sqlwayinvyearamount group by yearonly  order by maxinvestofyear desc LIMIT 5
''')

In [41]:
sqlwaytop5investor1.show(20, truncate=False)

+--------+---------------+
|yearonly|maxinvestofyear|
+--------+---------------+
|2015    |9600000        |
|2017    |9500000        |
|2020    |9000000        |
|2019    |9000000        |
|2018    |978000         |
+--------+---------------+



In [42]:
sqlwaytop5investor1.createOrReplaceTempView('sqlwaytop5investor1')

In [43]:
# sqlwaygetinvstnamestop5=spark.sql('''
# select * from sqlwayinvyearamount as a inner join sqlwaytop5investor1 as b on a.yearonly= b.yearonly and a.amountz=b.maxinvestofyear
# ''')

In [44]:
sqlwaygetinvstnamestop5=spark.sql('''
select * from sqlwayinvyearamount as a inner join sqlwaytop5investor1 as b on a.yearonly= b.yearonly and a.amountz=b.maxinvestofyear order by b.maxinvestofyear desc
''')

In [45]:
sqlwaygetinvstnamestop5.show(5, truncate=False)

+-------------------------------------+--------+-------+--------+---------------+
|Investors_Name                       |yearonly|amountz|yearonly|maxinvestofyear|
+-------------------------------------+--------+-------+--------+---------------+
|Sequoia Capital, Accel Partners      |2015    |9600000|2015    |9600000        |
|Costanoa, Learn Capital, Jyoti Bansal|2017    |9500000|2017    |9500000        |
|Chiratae Ventures                    |2020    |9000000|2020    |9000000        |
|Matrix Partners                      |2019    |9000000|2019    |9000000        |
|SAR Group                            |2018    |978000 |2018    |978000         |
+-------------------------------------+--------+-------+--------+---------------+



## Bonus Questions and Answer

### Ans 1. Find the top startup(by amount raised) from each city?

In [50]:
sqlwaytopstrtbycity=spark.sql('''
SELECT Startup_Name,City, REGEXP_REPLACE( Amount_in_USD, '[^0-9]+', '' ) AS amountz  FROM sqlwaydfconsumerinternstartup 
''')

In [51]:
sqlwaytopstrtbycity.show(5, truncate=False)

+----------------------+---------+-------+
|Startup_Name          |City     |amountz|
+----------------------+---------+-------+
|Cube Consumer Services|Mumbai   |       |
|Pitstop               |Bangalore|       |
|GoGo Truck            |Chennai  |75000  |
|Justride              |Mumbai   |400000 |
|GlamStudios           |Noida    |       |
+----------------------+---------+-------+
only showing top 5 rows



In [54]:
sqlwaytopstrtbycity.printSchema()

root
 |-- Startup_Name: string (nullable = true)
 |-- City: string (nullable = true)
 |-- amountz: string (nullable = true)



In [53]:
sqlwaytopstrtbycity.createOrReplaceTempView('sqlwaytopstrtbycity')

In [59]:
# temp3=spark.sql('''
# select distinct(City)  from sqlwaytopstrtbycity 
# ''').show()

In [68]:
sqlwaytopstartcityaggre=spark.sql('''
select City, CAST(max(amountz) AS LONG) as maxofcity  from sqlwaytopstrtbycity group by City order by maxofcity desc 
''')

In [69]:
sqlwaytopstartcityaggre.show(truncate=False)

+-------------------+----------+
|City               |maxofcity |
+-------------------+----------+
|\\xc2\\xa0Noida    |2020000000|
|\\xc2\\xa0Mumbai   |2019350000|
|\\xc2\\xa0Bangalore|2016200000|
|Menlo Park         |450000000 |
|California         |300000000 |
|Kormangala         |283000000 |
|Nairobi            |273903468 |
|Faridabad          |231000000 |
|India/Singapore    |226000000 |
|Tulangan           |200000000 |
|San Jose,          |135000000 |
|Santa Monica       |110000000 |
|New Delhi / US     |63000000  |
|Singapore          |60000000  |
|New York           |52000000  |
|Palo Alto          |51000000  |
|India              |42000000  |
|Taramani           |38080000  |
|San Francisco      |37000000  |
|Surat              |36230000  |
+-------------------+----------+
only showing top 20 rows



In [63]:
sqlwaytopstartcityaggre.createOrReplaceTempView('sqlwaytopstartcityaggre')

In [65]:
sqlwayamountraisedbystartupinthecity=spark.sql('''
select * from sqlwaytopstrtbycity as a inner join sqlwaytopstartcityaggre as b on a.City= b.City and a.amountz=b.maxofcity order by b.maxofcity desc
''')

In [67]:
sqlwayamountraisedbystartupinthecity.show(truncate=False)

+-------------------------------+-------------------+----------+-------------------+----------+
|Startup_Name                   |City               |amountz   |City               |maxofcity |
+-------------------------------+-------------------+----------+-------------------+----------+
|\\xc2\\xa0News in shorts       |\\xc2\\xa0Noida    |2020000000|\\xc2\\xa0Noida    |2020000000|
|\\xc2\\xa0Loylty Rewards       |\\xc2\\xa0Mumbai   |2019350000|\\xc2\\xa0Mumbai   |2019350000|
|\\xc2\\xa0Bluestone            |\\xc2\\xa0Bangalore|2016200000|\\xc2\\xa0Bangalore|2016200000|
|GOQii                          |Menlo Park         |450000000 |Menlo Park         |450000000 |
|Automation Anywhere            |California         |300000000 |California         |300000000 |
|Vogo Automotive                |Kormangala         |283000000 |Kormangala         |283000000 |
|Sistema.bio                    |Nairobi            |273903468 |Nairobi            |273903468 |
|Lenskart.com                   |Faridab

### Ans 2. Which SubVertical had the highest growth(in number of startups) over the years?

In [70]:
sqlwaydfconsumerinternstartup.printSchema()

root
 |-- Sr_No: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Startup_Name: string (nullable = true)
 |-- Industry_Vertical: string (nullable = true)
 |-- SubVertical: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Investors_Name: string (nullable = true)
 |-- InvestmentnType: string (nullable = true)
 |-- Amount_in_USD: string (nullable = true)
 |-- Remarks: string (nullable = true)



In [93]:
sqlwaystarupgrowth1=spark.sql('''
select SubVertical, count(*) as noofstartups from sqlwaydfconsumerinternstartup where SubVertical!='nan' group by SubVertical order by noofstartups desc
''').show(5, truncate=False)

+-----------------------+------------+
|SubVertical            |noofstartups|
+-----------------------+------------+
|Online Lending Platform|11          |
|Online Pharmacy        |10          |
|Food Delivery Platform |8           |
|Online lending platform|5           |
|Education              |5           |
+-----------------------+------------+
only showing top 5 rows



In [82]:
sqlwaystarupgrowth1=spark.sql('''
select SubVertical, count(*) as noofstartups from sqlwaydfconsumerinternstartup where SubVertical!='nan' group by SubVertical order by noofstartups desc limit 1
''').show(truncate=False)

+-----------------------+------------+
|SubVertical            |noofstartups|
+-----------------------+------------+
|Online Lending Platform|11          |
+-----------------------+------------+



### Ans 3: Which SubVertical had the highest growth(in funding) over the years?

In [97]:
sqlwaysubverticleamount1=spark.sql('''
SELECT SubVertical, REGEXP_REPLACE( Amount_in_USD, '[^0-9]+', '' ) AS amountz  FROM sqlwaydfconsumerinternstartup 
''')

In [98]:
sqlwaysubverticleamount1.show(5, truncate=False)

+------------------------------------------------+-------+
|SubVertical                                     |amountz|
+------------------------------------------------+-------+
|Finance management Mobile app                   |       |
|Online Car Repair and Servicing Booking platform|       |
|Truck Aggregator & Booking platform             |75000  |
|Self Drive Car Rental                           |400000 |
|Online Salon Aggregator                         |       |
+------------------------------------------------+-------+
only showing top 5 rows



In [99]:
sqlwaysubverticleamount1.createOrReplaceTempView('sqlwaysubverticleamount1')

In [100]:
sqlwaysubverticleamount2=spark.sql('''
SELECT SubVertical, sum(amountz) as totalamountraisedforsubvert FROM sqlwaysubverticleamount1 where SubVertical!='nan'  group by SubVertical order by totalamountraisedforsubvert desc
''')

In [101]:
sqlwaysubverticleamount2.show(20, truncate=False)

+----------------------------------+---------------------------+
|SubVertical                       |totalamountraisedforsubvert|
+----------------------------------+---------------------------+
|Bike Taxi                         |3.9E9                      |
|Online Marketplace                |2.6487E9                   |
|ECommerce Marketplace             |1.7E9                      |
|E-Books                           |1.51095E9                  |
|Mobile Wallet & ECommerce platform|1.46E9                     |
|Mobile Wallet                     |1.057E9                    |
|Private Equity Firm               |6.0E8                      |
|Business development              |5.85E8                     |
|Hospitality                       |4.88997554E8               |
|Wearable Fitness Bands            |4.5E8                      |
|App based cab aggregator          |3.3E8                      |
|Online Grocery & Food Store       |3.0E8                      |
|Robotics                

In [102]:
sqlwaysubverticleamount3=spark.sql('''
SELECT SubVertical, sum(amountz) as totalamountraisedforsubvert FROM sqlwaysubverticleamount1 where SubVertical!='nan'  group by SubVertical order by totalamountraisedforsubvert desc limit 1
''')

In [103]:
sqlwaysubverticleamount3.show(truncate=False)

+-----------+---------------------------+
|SubVertical|totalamountraisedforsubvert|
+-----------+---------------------------+
|Bike Taxi  |3.9E9                      |
+-----------+---------------------------+

