<a href="https://colab.research.google.com/github/jlopez1288/-Python-projects-Data-201/blob/main/Copy_of_Jennifer_Lopez_HW_04.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Big Data Analytics Homework 04

*Complete this assignment in Google Colab. Prior to submitting a copy of this notebook (.ipynb format), run every cell and ensure you have corrected all runtime errors. Be sure to fill in your Name and SUID in the following cell. As always, you must do your own work. This means you may not use answers to the following questions generated by any other person or a generative AI tool such as ChatGPT. You may, however, discuss this assignment with others in a general way and seek help when you need it, but, again, you must do your own work.*

Name: Jennifer Lopez

SUID: 264179713

### Setup

In [None]:
! pip install pyspark -q

In [None]:
from pyspark.sql import SparkSession
from pyspark import SparkContext

sc = SparkContext.getOrCreate()

spark = SparkSession\
    .builder\
    .appName('Homework 04')\
    .getOrCreate()

This assignment uses a data set containing information about data science programs at universities worldwide.

The dataset contains many columns that we can use to understand how these data science programs differ from one another.

In [None]:
# download the data scince programs data set
%%bash
if [[ ! -f colleges-data-science-programs.csv ]]; then
 wget https://syr-bda.s3.us-east-2.amazonaws.com/colleges-data-science-programs.csv -q
fi

### Q1

Read `colleges-data-science-programs.csv` into a Spark data frame named `raw_ds_programs_text`.

In [None]:
# Import necessary modules/libraries
from pyspark.sql import SparkSession
import os
import urllib.request

# Initialize Spark session
spark = SparkSession.builder.appName("DataSciencePrograms").getOrCreate()

# Define the CSV file path
file_path = "colleges-data-science-programs.csv"

# Download the CSV file if it doesn't exist?
if not os.path.exists(file_path):
    url = "https://syr-bda.s3.us-east-2.amazonaws.com/colleges-data-science-programs.csv"
    urllib.request.urlretrieve(url, file_path)

# Read the CSV file into a Spark DataFrame
raw_ds_programs_text = spark.read.csv(file_path, header=True, inferSchema=True)




In [None]:
# do not modify
print('rows: ', raw_ds_programs_text.count(),
      ', cols:', len(raw_ds_programs_text.columns))

raw_ds_programs_text\
  .show(5)

rows:  222 , cols: 28
+---+--------------------+--------------------+--------------------+-------+-------+-----+------+--------+--------------------+-------------------+-------------------+----------------+------------+-------+----------+------------+-----+----+-------------+----------------+--------+---------+--------------------+--------+---------+---------+------+
| id|                name|                 url|             program| degree|country|state|online|oncampus|          department|         created_at|         updated_at|university_count|program_size|courses|admit_reqs|year_founded|notes|cost|visualization|machine learning|business|databases|programminglanguages|capstone|mapreduce|part-time|ethics|
+---+--------------------+--------------------+--------------------+-------+-------+-----+------+--------+--------------------+-------------------+-------------------+----------------+------------+-------+----------+------------+-----+----+-------------+----------------+--------+--

### Q2

Starting with `raw_ds_programs_text`, create a new data frame named `ds_programs_text` which simply adds a column named `text` to the original data frame.

The `text` column will be a concatenation of the following columns, separated by a space: `program`, `degree`, and `department`. You eill find the appropriate function in `pyspark.sql.functions`

An example of the `ds_programs_text_df` should give you:

```python
ds_programs_text.orderBy('id').first().text
```

```console
'Data Science Masters Mathematics and Statistics'
```

In [None]:
# your code here
from pyspark.sql.functions import concat_ws

# Create a new DataFrame with the 'text' column added
ds_programs_text = raw_ds_programs_text.withColumn(
    "text",
    concat_ws(" ", raw_ds_programs_text["program"], raw_ds_programs_text["degree"], raw_ds_programs_text["department"])
)


In [None]:
# do not modify
ds_programs_text.select('text')\
  .show(5, truncate = False)

+------------------------------------------------------------------+
|text                                                              |
+------------------------------------------------------------------+
|Data Science Masters Mathematics and Statistics                   |
|Analytics Masters Business and Information Systems                |
|Data Science Masters Computer Science                             |
|Business Intelligence & Analytics Masters Business                |
|Advanced Computer Science(Data Analytics) Masters Computer Science|
+------------------------------------------------------------------+
only showing top 5 rows



### Q3

Create a pipeline named `pipe_features` that creates a new dataframe `ds_features`. The `pipe_features` pipeline should add a column, `features` to `ds_programs_text` that contains the `tfidf` of the `text` column.

Make sure to create your pipeline using methodology similar to what was demonstrated in class. Aside from removing stop words and setting a minumum token length of 2, no further restrictions should be imposed on the resulting vocabulary.

In [None]:
# your code here that goes into steps

from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF
from pyspark.ml import Pipeline

# Step 1: Tokenize the 'text' column
tokenizer = Tokenizer(inputCol="text", outputCol="words_token")

# Step 2: Remove stop words and filter tokens with length >= 2
remover = StopWordsRemover(inputCol="words_token", outputCol="words_cleaned")
# filter tokens with length >= 2 in the TF stage.

# Step 3: Convert words to term frequency vectors using HashingTF
hashing_tf = HashingTF(inputCol="words_cleaned", outputCol="raw_features", numFeatures=10000)

# Step 4: Compute the IDF to get TF-IDF vectors
idf = IDF(inputCol="raw_features", outputCol="features")

# Step 5: Create the pipeline
pipe_features = Pipeline(stages=[tokenizer, remover, hashing_tf, idf])

# Fit and transform the pipeline on the data
pipe_model = pipe_features.fit(ds_programs_text)
ds_features = pipe_model.transform(ds_programs_text)




In [None]:
# do not modify
ds_features.select('features')\
  .show(5,
        truncate = False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------+
|features                                                                                                                                                 |
+---------------------------------------------------------------------------------------------------------------------------------------------------------+
|(10000,[193,1695,2236,3363,8846],[3.615412302232064,0.6709733230656233,2.462732792293678,1.2962979072868075,0.2538801769623398])                         |
|(10000,[3148,5374,8190,8846,8889],[0.8222042927895469,1.0377239189930973,2.411439497906128,0.2538801769623398,3.009276498661748])                        |
|(10000,[1695,3363,8151,8846],[0.6709733230656233,2.592595814573615,2.411439497906128,0.2538801769623398])                                                |
|(10000,[3148,4532,5374,6328,8846],[0.8222042927895469,2.3161293

### Q4

Create a pipeline model called `pipe_pca` that computes the first two principle components of the `features` column created by `pipe_features`, and creates a new column named `scores`.

Use `pipe_pca` to create a data frame, `ds_features_1` with the columns `id`, `name`, `url`, and `scores`.

Note: Prior to computing PCA scores, you will want to scale the TF-IDF outputs. Refer to lecture notes regarding the appropriate parameters to use during this step.

In [None]:
from pyspark.ml.feature import StandardScaler, PCA
from pyspark.ml import Pipeline

# Step 1: Scale the TF-IDF features
scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withMean=True, withStd=True)

# Step 2: Apply PCA to reduce to 2 principal components
pca = PCA(k=2, inputCol="scaled_features", outputCol="scores")

# Step 3: Create a pipeline with scaler and PCA
pipe_pca = Pipeline(stages=[scaler, pca])

# Step 4: Fit the pipeline on ds_features and transform it
pipe_pca_model = pipe_pca.fit(ds_features)
ds_pca = pipe_pca_model.transform(ds_features)

# Step 5: Select the required columns
ds_features_1 = ds_pca.select("id", "name", "url", "scores")

# Display result
ds_features_1.select("scores").show(5, truncate=False)


ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/dist-packages/IPython/core/interactiveshell.py", line 3553, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-10-1662353338>", line 14, in <cell line: 0>
    pipe_pca_model = pipe_pca.fit(ds_features)
                     ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/pyspark/ml/base.py", line 205, in fit
    return self._fit(dataset)
           ^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/pyspark/ml/pipeline.py", line 134, in _fit
    model = stage.fit(dataset)
            ^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/pyspark/ml/base.py", line 205, in fit
    return self._fit(dataset)
           ^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/pyspark/ml/wrapper.py", line 381, in _fit
    java_model = self._fit_java(dataset)
                 ^^^^^^^

ConnectionRefusedError: [Errno 111] Connection refused

In [None]:
# do not modify
ds_features_1\
  .select('scores')\
  .show(5,
        truncate = False)

### Q5

In this question you will write code that makes recommendations for programs closest to a program of interest.  

Create a function named `get_nearest_programs` that returns the 3 closest programs to a program of interest.

The `get_nearest_programs` function should take 1 argument: `program_of_interest`. Write the function so that it returns the 3 programs (as defined by the `name` column) closest to the program argument as defined by Euclidian (L2) distance. Do not return the program of interest as one of the names.

Your function should **not** consider **Bachelors** programs.

In [None]:
# your code here
from pyspark.sql.functions import col, udf
from pyspark.ml.linalg import Vectors
from pyspark.sql.types import DoubleType
import math

# Step 1: UDF to compute Euclidean (L2) distance between two vectors
def euclidean_distance(v1, v2):
    return float(math.sqrt(sum([(a - b) ** 2 for a, b in zip(v1, v2)])))

distance_udf = udf(lambda x, y: euclidean_distance(x, y), DoubleType())

# Step 2: Define the function
def get_nearest_programs(program_of_interest):
    # Filter to exclude Bachelors programs
    grad_programs = ds_features_1.filter(~col("name").contains("Bachelor"))

    # Get the vector for the program of interest
    poi_vector = grad_programs.filter(col("name") == program_of_interest).select("scores").first()

    if not poi_vector:
        print(f"Program '{program_of_interest}' not found.")
        return []

    poi_vec = poi_vector["scores"]

    # Step 3: Compute distance for all other programs
    with_distances = grad_programs.withColumn("distance", distance_udf(col("scores"), Vectors.dense(poi_vec)))

    # Step 4: Exclude the program of interest and get top 3 closest programs
    nearest_programs = with_distances\
        .filter(col("name") != program_of_interest)\
        .orderBy("distance")\
        .select("name")\
        .limit(3)

    return [row["name"] for row in nearest_programs.collect()]

def get_nearest_programs(program_of_interest):
  '''
  This function returns the 3 closest programs to a given program by calculating
  the distance between the PCA scores of the selected program to the rest of programs
  '''

In [None]:
# do not modify
get_nearest_programs('Syracuse University')

### Q6

Create two Pandas dataframes `pc1` and `pc2` with the columns `word` and `absolute_loading` that contain the top 5 absolute values (descending order) of loadings.

In [None]:
# your code here
import pandas as pd
import numpy as np

# Step 1: Extract vocab from the TF-IDF stage in the original pipeline
vocab = tfidf_model.vocabulary  # assuming you have access to tfidf_model from earlier step

# Step 2: Get the PCA component matrix (2 x vocab_size)
components = pca_model.pc.toArray()  # shape: (2, num_features)

# Step 3: Extract the loadings for PC1 and PC2
pc1_loadings = components[0]
pc2_loadings = components[1]

# Step 4: Create DataFrames with word and absolute loading
pc1_df = pd.DataFrame({
    "word": vocab,
    "absolute_loading": np.abs(pc1_loadings)
}).sort_values(by="absolute_loading", ascending=False).head(5)

pc2_df = pd.DataFrame({
    "word": vocab,
    "absolute_loading": np.abs(pc2_loadings)
}).sort_values(by="absolute_loading", ascending=False).head(5)

# Step 5: Rename final output
pc1 = pc1_df.reset_index(drop=True)
pc2 = pc2_df.reset_index(drop=True)


In [None]:
# do not modify
display(pc1.head())
display(pc2.head())

### Q7

Create a new pipeline called `pipe_pca_1` where you fit the maximum possible number of principal components for this dataset.

Create a scree plot and a plot of cumulative variance explained (exactly 2 plots).

Answer the following:

1. How many principal components were able to create (the maximum number)?

2. Based on either the scree or cumulative variance explained plot, how many principal components would you use if you were building a supervised machine learning model, and why?

In [None]:
# your code for new pipeline here
from pyspark.ml.feature import PCA, StandardScaler
from pyspark.ml import Pipeline
import matplotlib.pyplot as plt
import numpy as np

# Step 1: Get the number of features in TF-IDF
num_features = ds_features.select("features").first()["features"].size

# Step 2: Define new pipeline with PCA using max components
scaler_1 = StandardScaler(inputCol="features", outputCol="scaled_features", withMean=True, withStd=True)
pca_full = PCA(k=num_features, inputCol="scaled_features", outputCol="scores")

pipe_pca_1 = Pipeline(stages=[scaler_1, pca_full])

# Step 3: Fit the model
pipe_pca_1_model = pipe_pca_1.fit(ds_features)
pca_model_full = pipe_pca_1_model.stages[-1]  # get PCA model

# Step 4: Extract explained variance
explained_variance = pca_model_full.explainedVariance.toArray()

# Step 5: Plot
plt.figure(figsize=(14, 6))


# Cumulative variance
cumulative_variance = np.cumsum(explained_variance)
plt.subplot(1, 2, 2)
plt.plot(np.arange(1, len(cumulative_variance) + 1), cumulative_variance, marker='o')
plt.title('Cumulative Variance Explained')
plt.xlabel('Principal Component')
plt.ylabel('Cumulative Explained Variance')

plt.tight_layout()
plt.show()


In [None]:
# your code for scree plot here
# Scree plot
plt.subplot(1, 2, 1)
plt.plot(np.arange(1, len(explained_variance) + 1), explained_variance, marker='o')
plt.title('Scree Plot')
plt.xlabel('Principal Component')
plt.ylabel('Explained Variance')


In [None]:
# your code for cumulative variance explained plot here
# Cumulative variance
cumulative_variance = np.cumsum(explained_variance)
plt.subplot(1, 2, 2)
plt.plot(np.arange(1, len(cumulative_variance) + 1), cumulative_variance, marker='o')
plt.title('Cumulative Variance Explained')
plt.xlabel('Principal Component')
plt.ylabel('Cumulative Explained Variance')


*your answers here*

### Q8

Starting with `pipe_pca_1` from the previous question, transform the pipeline and save the resulting dataframe to a variable named `pca_fun`.  

Extract the output from the standard scaler column from the first row of `pca_fun` and store in a variable named `row1_centered`.

Manually compute 5 PCA scores by projecting `row1_centered` onto the first 5 loading vectors which were computed in your PCA object. Save the 5 projected pca scores in a varialbe called `proj_scores`.

Extract the first 5 PCA scores from the first row of the pca_fun scores column and save them in a variable named `pca_fun_scores`.

In [None]:
# your answer here
from pyspark.ml.feature import PCA, StandardScaler
#Transform the data using pipe_pca_1
pca_fun = pipe_pca_1_model.transform(ds_features)

#Extract the scaled features (standar scaler output) from the first row
row1_centered = pca_fun.select("scaled_features").first()["scaled_features"]

#Project row1_centered manually onto the first 5 PCA loading vectors
# Get PCA model from pipeline
pca_model_full = pipe_pca_1_model.stages[-1]

# Get first 5 principal component loading vectors
loadings_matrix = pca_model_full.pc.toArray()
first_5_loadings = loadings_matrix[:, :5]  # shape: (n_features, 5)

# Manually project: dot product
proj_scores = row1_centered.dot(first_5_loadings)

#Extract first 5 PCA scores from the scores column in pca_fun
pca_fun_scores = pca_fun.select("scores").first()["scores"][:5]




In [None]:
# do not modify
print(proj_scores)
print(pca_fun_scores)

### Q9

Perform an **inverse transform** on the `proj_scores` variable and store the result in a variable named `inverse`.

The grading cell below prints `inverse` and the original `row1_centered` data such that they are right next to each other.

If `inverse` is different than `row1_centered`, explain why. How you could modify the forward and reverse transformation process such that the resulting `inverse` data almost exactly matches `row1_centered`.

In [None]:
# your code here
import numpy as np

# Take the first 5 loading vectors
first_5_loadings = pca_model_full.pc.toArray()[:, :5]

# Inverse transform (back to scaled feature space)
inverse = np.dot(proj_scores, first_5_loadings.T)


*your answer here*

In [None]:
# do not modify
print(row1_centered[0:5])
print(inverse[0:5])

### Q10

Implement your modification so that `row1_centered` and `inverse` match almost exactly.

In [None]:
# your code here
# Get the full loading matrix (all components)
full_loadings = pca_model_full.pc.toArray()  # shape: [num_features, num_components]

# Project row1_centered onto all components
proj_scores_full = np.dot(row1_centered, full_loadings)

# Reconstruct the original scaled vector using all components
inverse = np.dot(proj_scores_full, full_loadings.T)


In [None]:
# do not modify
print(row1_centered[0:5])
print(inverse[0:5])