Apache Spark
-----

Apache Spark is an open-source, distributed computing system that provides a fast and general-purpose cluster-computing framework for big data processing. It was developed to address the limitations and challenges posed by traditional MapReduce-based systems by introducing a more flexible and efficient architecture. The goal of this assignment is to familiarize you with Apache Spark operations through practical exercises.

In [None]:
# importing required libraries
import os
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import col, substring, avg, mean, sum, round, count, collect_list, last, when, lower, regexp_replace, current_date, datediff, when, to_date, expr, format_number,trim, desc, first
from pyspark.sql.types import StructType, StructField, StringType, DecimalType, BooleanType, FloatType, DoubleType, TimestampType, IntegerType, LongType


from pyspark.ml.feature import Imputer, VectorAssembler, StringIndexer
from pyspark.ml.classification import RandomForestClassifier, LogisticRegression
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics

In [None]:
# Ensure the dataset is present (creates ./dataset/*.csv)
from downloader import ensure_dataset

ensure_dataset()


In [None]:
!echo $SPARK_MASTER_ADDRESS

In [None]:
spark = SparkSession.builder.master(os.getenv('SPARK_MASTER_ADDRESS')).appName("Spark-application").config("spark.ui.enabled", "false").getOrCreate()

In [None]:
!echo $SPARK_MASTER_OOD_ADDRESS
!echo $SPARK_WORKER1_OOD_ADDRESS
!echo $SPARK_WORKER2_OOD_ADDRESS

# Note: 

1. We have provided many hints on which functions to use for a particular task. If you feel like you can solve it without those methods or using different methods, you can do so.

2. Also, if you feel like that some columns should have been type converted or dropped and are not done during this exercise, feel free to do so. You can provide a small explanation.

3. The feature columns should be the following columns. Some are already present in the dataset and some are derived below in the exercise:

    'investment_rounds',
    'invested_companies',
    'funding_rounds',
    'relationships',
    'age_of_company',
    'total_amount_raised',
    'num_acquisitions',
    'have_been_acquired',
    'fin_org_financed',
    'person_financed',
    'startup_financed',
    'num_products',
    'category_code_index',
    'country_code_index'

4. If before creating feature vector, you have other columns in your dataset than mentioned above, drop them. If you want, you can include them in features, but mention it in the notebook.

5. Your code should be error free as we will run each cell while grading.

## Dataset

The database is composed of 5 tables containing many aspects related to the startup world from 1901-01-01 to 2014-10-01. Each table in the dataset represents a different aspect of the startup ecosystem, detailing the interactions, events, and entities involved in startup funding, growth, and development.

-------
### Objects

A broad table that likely serves as a central repository for entities in the dataset, including companies, startups, and perhaps other organizational forms. It includes detailed information on each entity, such as its status, industry category, funding received, and key milestones.
It contains 40 variables, of which the most important are name, entity_type, category_code, status, founded_at, country_code, state_code, investment_rounds, invested_companies, funding_rounds.

------
### Investments

Tracks investment transactions, detailing which entities (investors) have invested in which companies or startups during particular funding rounds.


-------
<!-- Offices
-----
geographic position of main offices (both of the companies and the investment funds). -->

### Funding Rounds

Captures details about specific funding events where startups or companies receive investment. (funded company, date and funding type, total raised amount, number of participants)

-----

### Relationships

Tracks the connections between individuals (people) and entities (companies, startups), including roles or positions held by individuals within companies, indicating the network of professionals in the startup ecosystem. (people, institutions, start and end date of relationship, role held)

------
### Acquisitions

Details about acquisition events where one company purchases another. (acquired company, acquiring company, price and date of acquisition, payment method)

-----

Here are some links to documentation which would be helpful for the tasks:


Pyspark sql datframe: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/dataframe.html


Pyspark sql functions: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/functions.html



# Exercise

## 1 .Read the following files to the Spark DataFrame and select given columns


1. **objects.csv**

   -  **Columns:** "id",
    "entity_type",
    "entity_id",
    "parent_id",
    "name",
    "category_code",
    "status",
    "founded_at",
    "closed_at",
    "country_code",
    "state_code",
    "city",
    "region",
    "investment_rounds",
    "invested_companies",
    "first_funding_at",
    "last_funding_at",
    "funding_rounds",
    "funding_total_usd",
    "participants",
    "relationships"
  
2. **acquisitions.csv**

   - **Columns:** 'id', 'acquisition_id', 'acquiring_object_id', 'acquired_object_id',
       'term_code', 'price_amount', 'price_currency_code', 'acquired_at'
  
2. **funding_rounds.csv**

   - **Columns:** "object_id",
    "funded_at",
    "funding_round_type",
    "funding_round_code",
    "raised_amount_usd",
    "is_first_round",
    "is_last_round"
    
2. **investments.csv**

   - **Columns:** 'funding_round_id', 'funded_object_id', 'investor_object_id'
  
2. **relationships.csv**

   - **Columns:** "id",
    "person_object_id",
    "relationship_object_id",
    "start_at",
    "end_at",
    "is_past",
    "title"

##### Read following csv files to the spark dataframe

Hint: Use .select() to select the required columns

In [None]:
#objects
# TODO: Your code starts here

# TODO: Your code ends here



In [None]:
#acquisitions
# TODO: Your code starts here

# TODO: Your code ends here



In [None]:
#funding_rounds
# TODO: Your code starts here

# TODO: Your code ends here



In [None]:
# investments
# TODO: Your code starts here

# TODO: Your code ends here



In [None]:
#relationships
# TODO: Your code starts here

# TODO: Your code ends here



## 2.1 Clean objects table


1. Type conversion: convert the datatype of given columns
        
        Column with dates: datetime type

        investment_rounds, invested_companies, funding_rounds, funding_total_usd, milestones, relationships: Numeric


2. Find Age of company in years.

        
        To calculate the age of a company, subtract the date in the 'founded_at' column from the date in the 'closed_at' column. If the 'closed_at' date is not provided (NULL), use the current date(today's date) instead.  If 'founded_at' date is NULL, then age of company will be NULL.
        After the above operation fill the NULL values in 'age_of_company' column with median age.
        Drop 'founded_at' and 'closed_at' after deriving the new column.


3. Handle Variation. 

        The 'status' column in objects dataframe categorizes a company's current operational phase, such as 'operating', 'ipo', 'acquired', or 'closed', reflecting its lifecycle stage.
        
        To ensure uniformity in company status reporting, categorize all entries in the 'status' column into one of four standardized categories: 'operating', 'ipo', 'acquired', or 'closed'. You need to map various existing status names, which might currently reflect the same operational state in different terminology. Display the count of each status after modification. 





Hint: Use methods withColumn, col, cast for type conversion.

In [None]:
#1.1 Type conversion to timestamp
# TODO: Your code starts here

# TODO: Your code ends here



In [None]:
#1.2 Type conversion to integer
# TODO: Your code starts here

# TODO: Your code ends here



Hint: Some function used will be when, col, datediff, currentdate

In [None]:
# 2. Age_of_company
# TODO: Your code starts here

# TODO: Your code ends here



In [None]:
# 3. Map status to predefined categories
# TODO: Your code starts here

# TODO: Your code ends here



## 2.2 Clean Funding_rounds table

1. Cleaning raised_amount_usd column:

       The raised_amount_usd has values in inconsistent formatting with special characters. Remove those and create a consistent formatting of values.

2. Filling NULL values in raised_amount_usd column:

        Calculate the average 'raised_amount_usd' for each 'funding_round_type'. For each funding type, replace missing or null values in the raised_amount_usd column with the corresponding average amount raised for that funding type.

3. Create new column 'total_amount_raised' by aggregating the 'raised_amount_usd' column based on 'object_id'.

3. Display the dataframe


Hint: Use groupBy() and mean()

In [None]:
#Fix values for column: raised_amount_usd
# TODO: Your code starts here

# TODO: Your code ends here



In [None]:
# Aggregate raised_amount_usd
# TODO: Your code starts here

# TODO: Your code ends here



In [None]:
#display dataframe
# TODO: Your code starts here

# TODO: Your code ends here



## 3. Print duplicate rows count and remove those rows from each dataframe

In [None]:
#Drop Duplicates
# TODO: Your code starts here

# TODO: Your code ends here



## 4. Splitting the Objects Table

The object dataset consists of information about startups, including details about their products and the nature of their relationships with financial organizations and persons.

We would like to create different entities based on the entity_type. Using these entities we will derive new features which will be later used in training.



#### **Question: Divide the 'objects' dataset into four distinct datasets based on the 'entity_type' column. Also display count of rows for each dataset.**

Hint: Use filter() method to filter rows using the given condition.

In [None]:
#Create startups
# TODO: Your code starts here

# TODO: Your code ends here



In [None]:
#Create financial_org
# TODO: Your code starts here

# TODO: Your code ends here



In [None]:
# Create products
# TODO: Your code starts here

# TODO: Your code ends here



In [None]:
# Create persons
# TODO: Your code starts here

# TODO: Your code ends here



## 5. Derive new features

This part will require you to apply joins and derive aggregated features by grouping data.


Hint: Use groupBy() to group and agg() to aggregate over dataframe, alias() to give new name to column, join() to join tables if required.

### 5.1

You are provided with dataset named 'acquisitions'.

    i) The column 'acquiring_object_id' indicates the entity that has made the acquisition. Group the data based on the 'acquiring_object_id'. For each group, count the total number of acquisitions made. Rename the aggregated count to 'num_acquisitions'. 
    Your final output should be a DataFrame named 'acquiring' that lists each 'acquiring_object_id' alongside the corresponding 'num_acquisitions'. Display the dataframe.

 
    ii) The column 'acquired_object_id' specifies the entity that was acquired. Group the dataset by 'acquired_object_id' to organize the data by each entity that has been acquired. Compute the count of acquisitions for each group. Rename the aggregated count to 'have_been_acquired'. 
    Your final output should be a DataFrame named 'acquired' that lists each 'acquired_object_id' alongside the corresponding 'have_been_acquired'. Display the dataframe.

In [None]:
# create num_acquisitions
# TODO: Your code starts here

# TODO: Your code ends here



In [None]:
# display acquiring df
# TODO: Your code starts here

# TODO: Your code ends here



In [None]:
# have_been_acquired
# TODO: Your code starts here

# TODO: Your code ends here



In [None]:
# display acquired df
# TODO: Your code starts here

# TODO: Your code ends here



### 5.2

    Utilize the 'investments' and 'financial_org' datasets to identify how many times each entity has been financed by financial organizations.
  
    Your final output should be a DataFrame named 'finorgs' that lists each 'funded_object_id' alongside the corresponding 'fin_org_financed'. Display the dataframe.

    

In [None]:
# create finorgs
# TODO: Your code starts here

# TODO: Your code ends here



In [None]:
# display finorgs
# TODO: Your code starts here

# TODO: Your code ends here



### 5.3

    Determine the number of investments made by individuals in various entities using the 'investments' and 'persons' datasets. 

    Your final output should be a DataFrame named 'num_persons' that lists each 'funded_object_id' alongside the corresponding 'person_financed'. Display the dataframe.

In [None]:
# create num_persons
# TODO: Your code starts here

# TODO: Your code ends here



In [None]:
# display num_persons
# TODO: Your code starts here

# TODO: Your code ends here



### 5.4

    Calculate how many times each entity has been financed by startups using the 'investments' and 'startups' datasets.

    Your final output should be a DataFrame named 'nstartup' that lists each 'funded_object_id' alongside the corresponding 'startup_financed'. Display the dataframe.

In [None]:
# create nstartup
# TODO: Your code starts here

# TODO: Your code ends here



In [None]:
# display nstartup
# TODO: Your code starts here

# TODO: Your code ends here



### 5.5

    Determine the number of products associated with each parent entity using the 'products' dataset.

      Your final output should be a DataFrame named 'nproducts' that lists each 'parent_id' alongside the corresponding 'num_products'. Display the dataframe.

In [None]:
# create nproducts
# TODO: Your code starts here

# TODO: Your code ends here



In [None]:
# display nproducts
# TODO: Your code starts here

# TODO: Your code ends here



## 6. Joins

You will join all the tables created above and create a final dataset. The name of the final dataset will be train_data. 

Joins in spark: https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.join.html

 ### 6.1
 
    Combine the 'startups' and 'funding_rounds' datasets to analyze funding details for each startup. Your output should be a DataFrame named 'train_data'. Print columns of final dataset.

In [None]:
# TODO: Your code starts here

# TODO: Your code ends here



### 6.2

    Join the 'acquiring' and 'acquired' DataFrames with the existing 'train_data' DataFrame to integrate data on acquisitions. Print columns of final dataset. You don't require the id through which you are joining these tables. Drop those columns after joining.

In [None]:
# TODO: Your code starts here

# TODO: Your code ends here



### 6.3

    Join the 'train_data' DataFrame by merging it with 'finorgs', 'num_persons', 'nstartup', and 'nproducts' DataFrames based on relevant ID matches and streamline the merged dataset by removing redundant columns during each join operation. Print columns of final dataset.

In [None]:
# TODO: Your code starts here

# TODO: Your code ends here



### 6.4

We have compiled a comprehensive dataset that includes information on financial organization investments, individual investments, startup investments, and product counts under each entity.

    Display the count of train_data. Also, display the schema of train dataset i.e the column names with corresponding data types. 

In [None]:
# Write your code here
# TODO: Your code starts here

# TODO: Your code ends here



In [None]:
# Run this cell to remove unecessary columns:
columns_to_drop = ['parent_id', 'entity_id', 'closed_at', 'funded_object_id']
train_data = train_data.drop(*columns_to_drop)

### 6.5 Converting datatypes of feature columns

Convert all the numeric data columns to float and print them

In [None]:
from pyspark.sql.functions import col

# Columns to convert to float
columns_to_convert = [
    'investment_rounds',
    'invested_companies',
    'funding_rounds',
    'relationships',
    'total_amount_raised',
    'num_acquisitions',
    'fin_org_financed',
    'num_products',
    'startup_financed',
    'person_financed',
    'funding_total_usd',
    'participants',
    'age_of_company',
    'raised_amount_usd',
    'have_been_acquired'
]

In [None]:
# Convert specified columns to float
# TODO: Your code starts here

# TODO: Your code ends here



### 6.6 Transform the 'id' column in the 'train_data' DataFrame by removing any 'c:' prefix.

In [None]:
from pyspark.sql.functions import col, regexp_replace

columns_to_convert = ['id']

In [None]:
# TODO: Your code starts here

# TODO: Your code ends here



## Q7. Spark Mlib: Classification

MLlib(Main Guide): https://spark.apache.org/docs/latest/ml-guide.html

MLlib (DataFrame-based): https://spark.apache.org/docs/latest/api/python/reference/pyspark.ml.html




In this part, you will perform multiclass classification task using Apache Spark's scalable machine learning library(Spark Mlib).

Predict the status of startups by performing a classification analysis on the categorical dependent variable 'Status'

The dependent variable is a categorical one, made up of 4 non-orderable levels, indicating the STATUS of each startup. These levels are:

    CLOSED : failed startup

    ACQUIRED : acquired startup

    IPO : listed startup

    OPERATING : startup not acquired or listed

### 7.1 Fill Missing Values for categorical columns

    Perform Mode imputation for categorical columns like 'category_code', 'country_code'. 

    Mode imputation: Replace missing values with the mode (the most frequently occurring value) of the column.


Hint: Order your column in descending order according to frequency and get the first value.
Methods Used: groupBy(), orderBy(), count(), desc(), first()

In [None]:
# Write your code here
# TODO: Your code starts here

# TODO: Your code ends here



### 7.2 Convert categorical columns to numeric using String Indexer

    String Indexer assigns a unique integer based on label frequencies, with the most frequent label getting index 0.

StringIndexer: https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.StringIndexer.html

In [None]:
# Write your code here
# TODO: Your code starts here

# TODO: Your code ends here



In [None]:
# If you've renamed the original columns, then drop them
train_data = train_data.drop(*categorical_columns)

 ### 7.3 Fill the missing values for numerical columns

    First identify the numerical columns. Use the imputer method to fill the missing values using 'mean' strategy for these columns.

Imputer(): https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.Imputer.html


In [None]:
# Write your code here 
# TODO: Your code starts here

# TODO: Your code ends here



In [None]:
#drop unnecessary columns
columns_to_drop = ["id", "participants", "is_first_round", "is_last_round", "acquiring_object_id", "acquired_object_id", "funding_round_type", "funded_at", "funding_round_code", "raised_amount_usd","object_id"]
train_data = train_data.drop(*columns_to_drop)

In [None]:
train_data.columns

### 7.4 Feature vector

    
    Use Pyspark ML's VectorAssembler() which is a feature transformer to convert the feature columns to single column vector. Index the target variable 'status'.

VectorAssembler(): https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.VectorAssembler.html

In [None]:
# Assemble features
# TODO: Your code starts here

# TODO: Your code ends here



### 7.5 Model selection with 5-fold cross-validation (Random Forest vs Logistic Regression)

Split the dataset into **train/test**. On the **training split only**, run **5-fold cross-validation** (Spark ML `CrossValidator`) for **two models** and pick the best model/hyperparameters based on the CV metric. Then evaluate the selected best model on the held-out **test** split (no CV-on-test) and report test accuracy.

RandomForest(): https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.classification.RandomForestClassifier.

LogisticRegression(): https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.classification.LogisticRegression.html

CrossValidator(): https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.tuning.CrossValidator.html

ParamGridBuilder(): https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.tuning.ParamGridBuilder.html

Pipeline(): https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.Pipeline.html

MulticlassClassificationEvaluator(): https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.evaluation.MulticlassClassificationEvaluator.html


In [None]:
# TODO: Your code starts here

# TODO: Your code ends here



### 7.6 Evaluation

    Evaluate the performance of a machine learning model on a multi-class classification problem. Print out precision and recall for each class identified in the model's predictions.

MulticlassMetrics(): https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.mllib.evaluation.MulticlassMetrics.html

In [None]:
# TODO: Your code starts here

# TODO: Your code ends here

