**Initializing Spark**


In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.getOrCreate()

In [3]:
companies_path = r"C:\Users\sachi\Downloads\Exercise\companies.txt"
invest_path = r"C:\Users\sachi\Downloads\Exercise\InvestmentData.csv"

**Reading the data and creating pyspark dataframe**

In [4]:
investment_df = spark.read.option("header",True).csv(invest_path)
companies_df = spark.read.option("header",True).csv(companies_path, sep="\t")

**Creating views**

In [6]:
companies_df.createOrReplaceTempView("companies")
investment_df.createOrReplaceTempView("investment")

**Creating a master_df where investment and companies dataset are joined**

In [95]:
master_df = spark.sql("""
select 
company_permalink,
funding_round_type,
raised_amount_usd,
name,
split(category_list,"[|]")[0] as category_list ,
country_code
 from  investment 
 left join companies
 on
trim(lower(companies.permalink)) = trim(lower(investment.company_permalink))
where raised_amount_usd is not null and category_list is not null and country_code is not null
""" )

**Creating View and caching the master_df since it is used multiple times**

In [96]:
master_df.createOrReplaceTempView("master")
master_df.cache()

DataFrame[company_permalink: string, funding_round_type: string, raised_amount_usd: string, name: string, category_list: string, country_code: string]

**Retriving the funding_round_type which has highest number of investment**

In [101]:
import time
start_time = time.time()
#Suitable Funding type 
spark.sql("""
select 
funding_round_type,
count(*)  as count
from master
group by funding_round_type  order by count desc
"""
).show(5)
print(f"Output fetched in  {time.time() - start_time} seconds")

+------------------+-----+
|funding_round_type|count|
+------------------+-----+
|           venture|47809|
|              seed|21090|
|    debt_financing| 6506|
|             angel| 4400|
|             grant| 1936|
+------------------+-----+
only showing top 5 rows

 Output fetched in  0.3593330383300781 seconds


<br>

**Retriving the top 3 country code for funding_round_type=="venture"**

In [103]:
start_time = time.time()
spark.sql("""
select country_code,sum(raised_amount_usd) as invested_amount
from master
where funding_round_type="venture"
group by country_code order by invested_amount desc
"""
).show(5)
print(f"Output fetched in  {time.time() - start_time} seconds")

+------------+----------------+
|country_code| invested_amount|
+------------+----------------+
|         USA|4.20068029342E11|
|         CHN| 3.9338918773E10|
|         GBR| 2.0072813004E10|
|         IND| 1.4261508718E10|
|         CAN|   9.482217668E9|
+------------+----------------+
only showing top 5 rows

Output fetched in  0.5844223499298096 seconds


<br>

<br>

**Reading the sector data**

In [None]:
sector_path = r"C:\Users\sachi\Downloads\Exercise\mapping.csv"
sector_df =  spark.read.option("header",True).csv(sector_path)

**Creating Pyspark Melt function to convert column names to rows**

In [32]:
from pyspark.sql.types  import *
from pyspark.sql import DataFrame
from pyspark.sql.functions import *
from typing import Iterable 
def melt(
        df: DataFrame, 
        id_vars: Iterable[str], value_vars: Iterable[str], 
        var_name: str="variable", value_name: str="value") -> DataFrame:
    """Convert :class:`DataFrame` from wide to long format."""

    # Create array<struct<variable: str, value: ...>>
    _vars_and_vals = array(*(
        struct(lit(c).alias(var_name), col(c).alias(value_name)) 
        for c in value_vars))

    # Add to the DataFrame and explode
    _tmp = df.withColumn("_vars_and_vals", explode(_vars_and_vals))

    cols = id_vars + [
            col("_vars_and_vals")[x].alias(x) for x in [var_name, value_name]]
    return _tmp.select(*cols)

<br>

**Melting the sector data and creating view**

In [33]:
sector_df = melt(sector_df,['category_list'],[ i for i in sector_df.columns if i not in 'category_list' ]) \
.filter( (col("category_list").isNotNull()) & (col('value')!=0))
sector_df.createOrReplaceTempView("sector")

<br>

**Retriving the top 2 sectors for Country GBR,CHN,USA**

In [108]:
start_time = time.time()
final_df = spark.sql("""

select country_code,main_sector,sector_count,sector_sum from (
select *, row_number() over (partition by country_code order by sector_count desc ) as rank 
from(
select country_code,main_sector,sum(raised_amount_usd) as sector_sum,count(*) as sector_count
from
 master 
left join 
(select regexp_replace(category_list,"0","na") as category_list 
,variable as main_sector from sector where variable is not null)sector
on 
master.category_list=sector.category_list
where funding_round_type="venture" and country_code in ("USA","CHN","GBR")
group by 1,2 order by country_code,sector_count
)
)where rank in (1,2)
"""
         )
final_df.show(10,False)
print(f"Output fetched in  {time.time() - start_time} seconds")

+------------+---------------------------------------+------------+----------------+
|country_code|main_sector                            |sector_count|sector_sum      |
+------------+---------------------------------------+------------+----------------+
|GBR         |Others                                 |516         |4.492219646E9   |
|GBR         |Cleantech / Semiconductors             |437         |5.052849729E9   |
|USA         |Others                                 |8310        |8.2796823598E10 |
|USA         |Cleantech / Semiconductors             |7857        |1.18834869645E11|
|CHN         |Others                                 |468         |9.26235361E9    |
|CHN         |Social, Finance, Analytics, Advertising|281         |9.443388612E9   |
+------------+---------------------------------------+------------+----------------+

Output fetched in  2.0860347747802734 seconds


<br>

**We can partition the data based on main_sector. 
Since the number of sectors are low and limit the number of partitions created at the output.
It is also helpful for the faster retrive of data based on sectors for business purpose to understand the investment**

<br>

<br>

**Explain Plan**

In [110]:
final_df.explain(False)

== Physical Plan ==
*(7) Project [country_code#49, main_sector#8165, sector_count#8167L, sector_sum#8166]
+- *(7) Filter rank#8168 IN (1,2)
   +- Window [row_number() windowspecdefinition(country_code#49, sector_count#8167L DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rank#8168], [country_code#49], [sector_count#8167L DESC NULLS LAST]
      +- *(6) Sort [country_code#49 ASC NULLS FIRST, sector_count#8167L DESC NULLS LAST], false, 0
         +- Exchange hashpartitioning(country_code#49, 200), ENSURE_REQUIREMENTS, [id=#4672]
            +- *(5) Sort [country_code#49 ASC NULLS FIRST, sector_count#8167L ASC NULLS FIRST], true, 0
               +- Exchange rangepartitioning(country_code#49 ASC NULLS FIRST, sector_count#8167L ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [id=#4668]
                  +- *(4) HashAggregate(keys=[country_code#49, main_sector#8165], functions=[sum(cast(raised_amount_usd#21 as double)), count(1)])
                     +-