<a href="https://colab.research.google.com/github/joshuajonme/machine-learning-tech-hubs/blob/aws-features/ml_processing.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import os
# Find the latest version of spark 3.0  from http://www-us.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.1'
spark_version = 'spark-3.0.1'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

Get:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Ign:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Ign:3 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:4 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release
Hit:5 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Get:6 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Get:7 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease [15.9 kB]
Hit:9 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:11 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Get:12 http://ppa.launchpad.net/graphics-drivers/ppa/ubuntu bionic InRelease [21.3 kB]
Get:13 http://security.ubuntu.com/ubuntu bionic-security/universe amd64 Packages [1,370 kB]
Get:14 http://archive.ubuntu.com/ubuntu bionic-backpor

In [2]:
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

--2020-12-05 23:03:48--  https://jdbc.postgresql.org/download/postgresql-42.2.9.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 914037 (893K) [application/java-archive]
Saving to: ‘postgresql-42.2.9.jar’


2020-12-05 23:03:50 (1.02 MB/s) - ‘postgresql-42.2.9.jar’ saved [914037/914037]



In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("tech-hub-ml").config("spark.driver.extraClassPath","/content/postgresql-42.2.9.jar").getOrCreate()

In [51]:
from pyspark import SparkFiles

# Load in raw-census.csv from S3 into a DataFrame
census_url = "https://tech-hub-ml.s3.amazonaws.com/raw-census.csv"
spark.sparkContext.addFile(census_url)

# Spark dataframe
df_census = spark.read.option('header', 'true').csv(SparkFiles.get("raw-census.csv"), inferSchema=True, sep=',', timestampFormat="mm/dd/yy")
df_census.show(10)

+-----------------+---------------+-----------------+----------------------+---------------+-----------------+---------------------+--------------------+--------+
|Income per capita|Median Age Male|Median Age Female|Bachelor's degree > 25|Total Education|Median Home Value|Public transportation|Total transportation|Zip Code|
+-----------------+---------------+-----------------+----------------------+---------------+-----------------+---------------------+--------------------+--------+
|             6999|           39.5|             41.3|                  1781|          11838|            86200|                    0|                3453|     601|
|             9277|           41.9|             42.8|                  3692|          27411|            86300|                   32|               11444|     602|
|            11307|           39.1|             42.9|                  5888|          34274|           122400|                   35|               11908|     603|
|             5943|   

In [52]:
from pyspark.sql import functions as func

# Convert education and transportation to a percentage of the population
df_census = df_census.withColumn("Bachelor's degree > 25", func.round(100*func.col("Bachelor's degree > 25")/func.col("Total Education"),2))
df_census = df_census.withColumn("Public transportation", func.round(100*func.col("Public transportation")/func.col("Total transportation"),2))

# Remove Total Education and Total transportation columns
df_census = df_census.drop("Total Education", "Total transportation")
df_census.show()

+-----------------+---------------+-----------------+----------------------+-----------------+---------------------+--------+
|Income per capita|Median Age Male|Median Age Female|Bachelor's degree > 25|Median Home Value|Public transportation|Zip Code|
+-----------------+---------------+-----------------+----------------------+-----------------+---------------------+--------+
|             6999|           39.5|             41.3|                 15.04|            86200|                  0.0|     601|
|             9277|           41.9|             42.8|                 13.47|            86300|                 0.28|     602|
|            11307|           39.1|             42.9|                 17.18|           122400|                 0.29|     603|
|             5943|           43.2|             43.4|                  6.89|            91600|                  0.0|     606|
|            10220|           40.4|             44.2|                 13.03|            88600|                 0.27|  

In [71]:
# Load in real_estate.csv from S3 into a DataFrame
real_estate_url = "https://techhubml.s3.amazonaws.com/real_estate.csv"
spark.sparkContext.addFile(real_estate_url)

# Spark dataframe
df_real_estate = spark.read.option('header', 'true').csv(SparkFiles.get("real_estate.csv"), inferSchema=True, sep=',', timestampFormat="mm/dd/yy")

# Update Average RE Prices to two decimal places
df_real_estate = df_real_estate.withColumn("Average RE Prices", func.round(func.col("Average RE Prices"),2))
df_real_estate.show(10)

+--------+-----+----------+--------------------+-------------------+-----------------+
|Zip Code|State|      City|               Metro|         CountyName|Average RE Prices|
+--------+-----+----------+--------------------+-------------------+-----------------+
|     501|   NY|Holtsville|New York-Newark-J...|     Suffolk County|        128202.92|
|     602|   AL|    Auburn|      Auburn-Opelika|         Lee County|        114105.25|
|     605|   AL| Aguadilla|              Dothan|     Houston County|         78791.83|
|     606|   MO|    Neosho|              Joplin|      Newton County|         59377.75|
|     610|   KY|    Anasco|                null|        Pike County|        108219.58|
|     617|   AL|    Imbery|          Enterprise|      Coffee County|         101022.0|
|     693|   PA|Greensburg|          Pittsburgh|Westmoreland County|         330761.5|
|     705|   UT|  Aibonito|                null|        Kane County|        129276.08|
|     734|   TX|     Ponce|             Mid

In [77]:
from pyspark.sql.types import StringType

# Inner join of the two dataframes
merged_df = df_real_estate.join(df_census, on = ["Zip Code"])

# Add leading 0's to the zip code column
merged_df = merged_df.withColumn("Zip Code", func.format_string("%05d","Zip Code"))
merged_df = merged_df.withColumn("Zip Code", func.col("Zip Code").cast(StringType()))
merged_df.show(10)

+--------+-----+----------+--------------+-------------------+-----------------+-----------------+---------------+-----------------+----------------------+-----------------+---------------------+
|Zip Code|State|      City|         Metro|         CountyName|Average RE Prices|Income per capita|Median Age Male|Median Age Female|Bachelor's degree > 25|Median Home Value|Public transportation|
+--------+-----+----------+--------------+-------------------+-----------------+-----------------+---------------+-----------------+----------------------+-----------------+---------------------+
|   00602|   AL|    Auburn|Auburn-Opelika|         Lee County|        114105.25|             9277|           41.9|             42.8|                 13.47|            86300|                 0.28|
|   00606|   MO|    Neosho|        Joplin|      Newton County|         59377.75|             5943|           43.2|             43.4|                  6.89|            91600|                  0.0|
|   00610|   KY|    

In [78]:
features_df = merged_df.drop("Metro", "Median Home Value")
features_df.show()

+--------+-----+------------+-------------------+-----------------+-----------------+---------------+-----------------+----------------------+---------------------+
|Zip Code|State|        City|         CountyName|Average RE Prices|Income per capita|Median Age Male|Median Age Female|Bachelor's degree > 25|Public transportation|
+--------+-----+------------+-------------------+-----------------+-----------------+---------------+-----------------+----------------------+---------------------+
|   00602|   AL|      Auburn|         Lee County|        114105.25|             9277|           41.9|             42.8|                 13.47|                 0.28|
|   00606|   MO|      Neosho|      Newton County|         59377.75|             5943|           43.2|             43.4|                  6.89|                  0.0|
|   00610|   KY|      Anasco|        Pike County|        108219.58|            10220|           40.4|             44.2|                 13.03|                 0.27|
|   00617|