In [16]:
# Import Dependencies 
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
from sklearn.preprocessing import OneHotEncoder
import tensorflow as tf

import pandas as pd
from matplotlib import pyplot as plt 
import os

In [17]:
# Find the latest version of spark 3.0  from http://www-us.apache.org/dist/spark/ and enter as the spark version
spark_version = 'spark-3.0.2'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Get:1 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
0% [Connecting to archive.ubuntu.com (91.189.88.152)] [1 InRelease 14.2 kB/88.70% [Connecting to archive.ubuntu.com (91.189.88.152)] [Connecting to cloud.r-pr0% [1 InRelease gpgv 88.7 kB] [Connecting to archive.ubuntu.com (91.189.88.152)                                                                               Ign:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
0% [1 InRelease gpgv 88.7 kB] [Waiting for headers] [Connecting to cloud.r-proj                                                                               Hit:3 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
Hit:4 http://archive.ubuntu.com/ubuntu bionic InRelease
Ign:5 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:6 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Relea

In [18]:
# Start a Spark session 
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("CloudETL").config("spark.driver.extraClassPath","/content/postgresql-42.2.16.jar").getOrCreate()

In [19]:

# Read in data from S3 Buckets
from pyspark import SparkFiles
url ="https://lgodleski-bucket.s3.us-east-2.amazonaws.com/murders_county_merged.csv"
spark.sparkContext.addFile(url)
murders_county_merged_df = spark.read.csv(SparkFiles.get("murders_county_merged.csv"), sep=",", header=True, inferSchema=True)
murders_county_merged_df.show()

+--------+-------+--------+------+------+--------+-----+-----+------+-----+-------+-------+------+---------+------------+---------------+-------+------------+--------+------------+
|censusid| county|totalpop|   men| women|hispanic|white|black|native|asian|pacific|citizen|income|incomeerr|incomepercap|incomepercaperr|poverty|childpoverty|employed|unemployment|
+--------+-------+--------+------+------+--------+-----+-----+------+-----+-------+-------+------+---------+------------+---------------+-------+------------+--------+------------+
|    6001|Alameda| 1584983|776699|808284|    22.6| 33.0| 11.3|   0.3| 27.5|    0.8|1025865| 75619|      613|       37285|            279|   12.5|        15.2|  778132|         8.3|
|    6001|Alameda| 1584983|776699|808284|    22.6| 33.0| 11.3|   0.3| 27.5|    0.8|1025865| 75619|      613|       37285|            279|   12.5|        15.2|  778132|         8.3|
|    6001|Alameda| 1584983|776699|808284|    22.6| 33.0| 11.3|   0.3| 27.5|    0.8|1025865| 756

In [23]:
# Convert PySpark DataFrame to Pandas DataFrame
murders_df = murders_county_merged_df.toPandas()
murders_df.dtypes

censusid             int32
county              object
totalpop             int32
men                  int32
women                int32
hispanic           float64
white              float64
black              float64
native             float64
asian              float64
pacific            float64
citizen              int32
income               int32
incomeerr            int32
incomepercap         int32
incomepercaperr      int32
poverty            float64
childpoverty       float64
employed             int32
unemployment       float64
dtype: object

In [None]:
# Generate our categorical variable list 
murder_cat = murders_df.dtypes[murders_df.dtypes == "object"].index.tolist()
murder_cat

In [None]:
# Create a OneHotEncoder instance
enc = OneHotEncoder(sparse=False)

# Fit and transform the OneHotEncoder using the categorical variable list
encode_df = pd.DataFrame(enc.fit_transform(murders_df[murder_cat]))

# Add the encoded variable names to the DataFrame
encode_df.columns = enc.get_feature_names(murder_cat)
encode_df.head()

In [None]:
# Merge one-hot encoded features and drop the originals
murders_df = murders_df.merge(encode_df,left_index=True, right_index=True)
murders_df = murders_df.drop(murder_cat,1)
murders_df.head()

In [None]:
# Remove 'CrimeSolved' target from features data
y = murders_df.CrimeSolved
X = loans_df.drop(columns=["CrimeSolved"])

# Split training/test datasets
X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=89, stratify=y)

# Create a StandardScaler instance
scaler = StandardScaler()

# Fit the StandardScaler
X_scaler = scaler.fit(X_train)

# Scale the data
X_train_scaled = X_scaler.transform(X_train)
X_test_scaled = X_scaler.transform(X_test)

## Random Forest Model 

In [None]:
# Create a random forest classifier.
rf_model = RandomForestClassifier(n_estimators=500, random_state=89)

# Fitting the model
rf_model = rf_model.fit(X_train_scaled, y_train)

# Evaluate the model
y_pred = rf_model.predict(X_test_scaled)
print(f'Random forest training score: {rf_model.score(X_train_scaled, y_train)}')
print(f'Random forest testing score: {rf_model.score(X_test_scaled, y_test)}')
print(f" Random forest predictive accuracy: {accuracy_score(y_test,y_pred):.3f}")

In [None]:
# Rank features by importance and plot 
features = rf_model.feature_importances_
print(features)
plt.bar(x = range(len(features)), height=features)
plt.show()

##Logistic Regression Model

In [None]:
# Create a logistic regression model.
logr_model = LogisticRegression()

# Fitting the model. 
logr_model.fit(X_train_scaled, y_train)

# Evaluate the model. 
print(f'Logistic regression training Score: {logr_model.score(X_train_scaled, y_train)}')
print(f'Logistic regression testing Score: {logr_model.score(X_test_scaled, y_test)}')
print(f" Logistic regression predictive accuracy: {accuracy_score(y_test,y_pred):.3f}")

## Deep Neural Net Model 

In [None]:
# Define the model - deep neural net
number_input_features = len(X_train_scaled[0])
# Change number of nodes to reflect this dataset: layer1 + layer 2 should equal # of columns after merge/drop OneHotEncoding 
hidden_nodes_layer1 =  24
hidden_nodes_layer2 = 12

nn = tf.keras.models.Sequential()

# First hidden layer
nn.add(
    tf.keras.layers.Dense(units=hidden_nodes_layer1, input_dim=number_input_features, activation="relu")
)

# Second hidden layer
nn.add(tf.keras.layers.Dense(units=hidden_nodes_layer2, activation="relu"))


# Output layer
nn.add(tf.keras.layers.Dense(units=1, activation="sigmoid"))

# Compile the Sequential model together and customize metrics
nn.compile(loss="binary_crossentropy", optimizer="adam", metrics=["accuracy"])

# Train the model
fit_model = nn.fit(X_train_scaled, y_train, epochs=50)

# Evaluate the model using the test data
model_loss, model_accuracy = nn.evaluate(X_test_scaled,y_test,verbose=2)
print(f'deep neural net training Score: {nn_model.score(X_train_scaled, y_train)}')
print(f'deep neural net testing Score: {nn.score(X_test_scaled, y_test)}')
print(f"Loss: {model_loss}, Accuracy: {model_accuracy}")