## FINAL PROJECT: ISM 6562 - BIG DATA FOR BUSINESS

### **Setup Steps**

> ⚠️ **Note:** You must have a Java Development Kit (JDK) installed and the `JAVA_HOME` environment variable correctly configured.

- **For Windows**: The stable JDK version as of May 3, 2025, is **Java 11**.  
- **For macOS**: The stable version is **Java 17**, available from [Adoptium](https://adoptium.net/).

The Python code below checks for required packages and installs any that are missing. This ensures all necessary libraries are available for import in your project.


In [1]:
import importlib.util
import subprocess
import sys

# List of packages to ensure are installed
packages = [
    "numpy",
    "pandas",
    "matplotlib",
    "seaborn",
    "ucimlrepo",
    "pyspark",
    "sklearn",       # scikit-learn's import name is 'sklearn'
    "findspark"
]

for package in packages:
    if importlib.util.find_spec(package) is None:
        print(f"Installing {package}...")
        subprocess.check_call([sys.executable, "-m", "pip", "install", package])
    else:
        print(f"{package} is already installed.")


numpy is already installed.
pandas is already installed.
matplotlib is already installed.
seaborn is already installed.
ucimlrepo is already installed.
pyspark is already installed.
sklearn is already installed.
findspark is already installed.


In [1]:
# Importing libraries
import findspark
import pandas as pd
from ucimlrepo import fetch_ucirepo 
from pyspark.sql import SparkSession

from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, VectorIndexer, StandardScaler, PCA

### Import Data from UCI ML dataset library
Importing as a pandas dataframe as this is how the data is accessible from the UCI ML Repo. For cases where there was a larger dataset, a new pipeline into pyspark would further need to be developed. 

In [2]:
findspark.init() 
# If you have a SPARK_HOME environment variable set, it might find it automatically
spark_path = findspark.find()
print(spark_path)

/home/maurb/miniconda3/lib/python3.12/site-packages/pyspark


In [3]:
# fetch dataset 
communities_and_crime = fetch_ucirepo(id=183) 

# data (as pandas dataframes) 
X = communities_and_crime.data.features 
y = communities_and_crime.data.targets 

df = pd.concat([X, y], axis=1)

#### Create Spark Session

In [4]:
spark = SparkSession.builder.appName("CommunitiesCrime").getOrCreate()

your 131072x1 screen size is bogus. expect trouble
25/05/03 20:09:14 WARN Utils: Your hostname, DESKTOP-V063BIC resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/05/03 20:09:14 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/03 20:09:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/05/03 20:09:17 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
25/05/03 20:09:17 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


In [5]:
spark_df = spark.createDataFrame(df)
spark_df.show()

25/05/03 20:09:26 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

+-----+------+---------+--------------------+----+----------+-------------+------------+------------+------------+-----------+-----------+-----------+-----------+----------+---------+--------+---------+--------+------------+----------+----------+-----------+----------+---------+---------+-----------+-----------+------------+-----------+-----------+----------+-----------+--------------+---------------+------------+-----------+-------------+---------+-----------+---------------+------------+----------------+--------------+--------------+------------+-----------+----------+----------+-----------+----------------+-----------+-------------------+----------+--------+--------+--------+--------------+------------+------------+-------------+--------------+------------+------------+-------------+----------------+-------------------+---------------+-----------------+----------------+-----------------+------------------+---------------+----------------+--------------+--------+----------+----------

In [6]:
spark_df.createOrReplaceTempView("raw_crime")

In [7]:
columns = spark_df.columns

sql_expr = ",\n".join(
    [f"NULLIF({col}, '?') AS {col}" for col in columns]
)

query = f"SELECT {sql_expr} FROM raw_crime"
clean_df = spark.sql(query)
clean_df.createOrReplaceTempView("clean_crime")

#### Cleaning Dataset
Removing columns that are identifiers but will not provide any meaningful predictive values. Fold column was originally used as the cross-validation in the dataset.

In [8]:
cols_to_drop = ["state", "county", "community", "communityname", "fold"]
selected_cols = [c for c in clean_df.columns if c not in cols_to_drop]
clean_df = clean_df.select(*selected_cols)
clean_df.createOrReplaceTempView("final_crime")

In [9]:
numeric_cast_expr = ",\n".join(
    [f"CAST({col} AS DOUBLE) AS {col}" for col in selected_cols]
)

casted_df = spark.sql(f"SELECT {numeric_cast_expr} FROM final_crime")
casted_df.createOrReplaceTempView("tidy_crime")

In [10]:
# This is your clean, tidy, and numeric Spark DataFrame
tidy_df = spark.sql("SELECT * FROM tidy_crime")
tidy_df.show()

                                                                                

+----------+-------------+------------+------------+------------+-----------+-----------+-----------+-----------+----------+---------+--------+---------+--------+------------+----------+----------+-----------+----------+---------+---------+-----------+-----------+------------+-----------+-----------+----------+-----------+--------------+---------------+------------+-----------+-------------+---------+-----------+---------------+------------+----------------+--------------+--------------+------------+-----------+----------+----------+-----------+----------------+-----------+-------------------+----------+--------+--------+--------+--------------+------------+------------+-------------+--------------+------------+------------+-------------+----------------+-------------------+---------------+-----------------+----------------+-----------------+------------------+---------------+----------------+--------------+--------+----------+------------+-------------+----------------+--------------+

##### VectorAssembler
Use case: Combine multiple numeric features into a single feature vector column.

Why it's useful: This is a prerequisite for feeding data into ML models like linear regression or random forest.

In [11]:
# Drop all rows that contain nulls in any feature
feature_cols = [c for c in tidy_df.columns if c != "ViolentCrimesPerPop"]

# Then assemble
assembler = VectorAssembler(
    inputCols=feature_cols,
    outputCol="features",
    handleInvalid="skip"
)

##### StandardScaler
Use case: Normalize features to have zero mean and unit variance.

Why it's useful: Helps gradient-based models like logistic regression or linear regression converge better.

In [12]:
scaler = StandardScaler(inputCol='features', outputCol='scaledFeatures', withStd=True, withMean=True)
pca = PCA(k=20, inputCol='scaledFeatures', outputCol='pcaFeatures')

##### PCA (Principal Component Analysis)
Use case: Dimensionality reduction.

Why it's useful: The dataset has 100+ features, many of which are correlated.

In [13]:
pipeline = Pipeline(stages=[assembler, scaler, pca])
model = pipeline.fit(tidy_df)
result_df = model.transform(tidy_df)

25/05/03 20:09:49 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
25/05/03 20:09:50 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK


In [14]:
result_df.select("pcaFeatures").show(truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|pcaFeatures                                                                                                                                                                                                                                                                                                                                                                                                    |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [15]:
result_df.cache()

DataFrame[population: double, householdsize: double, racepctblack: double, racePctWhite: double, racePctAsian: double, racePctHisp: double, agePct12t21: double, agePct12t29: double, agePct16t24: double, agePct65up: double, numbUrban: double, pctUrban: double, medIncome: double, pctWWage: double, pctWFarmSelf: double, pctWInvInc: double, pctWSocSec: double, pctWPubAsst: double, pctWRetire: double, medFamInc: double, perCapInc: double, whitePerCap: double, blackPerCap: double, indianPerCap: double, AsianPerCap: double, OtherPerCap: double, HispPerCap: double, NumUnderPov: double, PctPopUnderPov: double, PctLess9thGrade: double, PctNotHSGrad: double, PctBSorMore: double, PctUnemployed: double, PctEmploy: double, PctEmplManu: double, PctEmplProfServ: double, PctOccupManu: double, PctOccupMgmtProf: double, MalePctDivorce: double, MalePctNevMarr: double, FemalePctDiv: double, TotalPctDiv: double, PersPerFam: double, PctFam2Par: double, PctKids2Par: double, PctYoungKids2Par: double, PctTeen2P

#### Data Visualizations