# ETL Example using Google Colab/Pyspark

---



In [9]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [111]:
import os
import findspark
from pyspark.sql.functions import col, udf, round, count
from pyspark.sql.types import StringType, IntegerType, FloatType
from google.colab import drive
drive.mount('/content/drive')
%cd /content/drive/MyDrive/ColabDemo/food-insecurity-machine-learning/


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
/content/drive/MyDrive/ColabDemo/food-insecurity-machine-learning


In [3]:
# Activate Spark in our Colab notebook.
spark_version = 'spark-3.3.2'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop3"

# Start a SparkSession
findspark.init()

Hit:1 https://cloud.r-project.org/bin/linux/ubuntu focal-cran40/ InRelease
Ign:2 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu2004/x86_64  InRelease
Hit:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2004/x86_64  InRelease
Hit:4 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu2004/x86_64  Release
Hit:5 http://security.ubuntu.com/ubuntu focal-security InRelease
Hit:6 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu focal InRelease
Hit:7 http://archive.ubuntu.com/ubuntu focal InRelease
Hit:9 http://ppa.launchpad.net/cran/libgit2/ubuntu focal InRelease
Hit:10 http://archive.ubuntu.com/ubuntu focal-updates InRelease
Hit:11 http://archive.ubuntu.com/ubuntu focal-backports InRelease
Hit:12 http://ppa.launchpad.net/deadsnakes/ppa/ubuntu focal InRelease
Hit:13 http://ppa.launchpad.net/graphics-drivers/ppa/ubuntu focal InRelease
Hit:14 http://ppa.launchpad.net/ubuntugis/ppa/ubuntu focal InRelease
Reading package list

In [4]:
# Get postgresql package
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

--2023-03-04 00:46:31--  https://jdbc.postgresql.org/download/postgresql-42.2.9.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 914037 (893K) [application/java-archive]
Saving to: ‘postgresql-42.2.9.jar.2’


2023-03-04 00:46:33 (1.24 MB/s) - ‘postgresql-42.2.9.jar.2’ saved [914037/914037]



In [5]:
# Import Spark and create a SparkSession
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("food-insecurity-ML").config("spark.driver.extraClassPath","/content/postgresql-42.2.9.jar").getOrCreate()

In [6]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [72]:
from pyspark import SparkFiles
from pyspark.sql.functions import substring, col, format_string

# Read the CSV file into a DataFrame with an inferred schema
url = "https://gtbootcamp20230221.s3.amazonaws.com/FoodAccessResearchAtlasData2019.csv"
spark.sparkContext.addFile(url)
df = spark.read.option('header', 'true').csv(SparkFiles.get("FoodAccessResearchAtlasData2019.csv"), inferSchema=True, sep=',')

# Cast the type of CensusTract as a string with 11 characters, adding a leading zero as needed
df = df.withColumn('CensusTract', format_string('%011d','CensusTract').cast(StringType()))

# Derive the StateFIPS code
df = df.withColumn('StateFIPS', substring(col('CensusTract'),0,2))

df.show()

+-----------+-------+--------------+-----+-------+-------+-----------------+--------+--------+-----------------+--------------------+-----------------+------------------+--------+---------------+-----------+------------------+--------+-----------+--------+-------------+---------+----------+----------+------------------+---------+----------+---------+----------+-----------+----------+---------+--------------+----------+---------------+----------+---------------+-------------+------------------+-----------+----------------+-----------+----------------+-----------+----------------+-----------+----------------+----------+---------------+-------------+------------------+----------+---------------+----------+---------------+----------+---------------+------+-----------+-------+------------+-------+------------+----------+---------------+--------+-------------+--------+-------------+--------+-------------+--------+-------------+-------+------------+----------+---------------+-------+---------

In [73]:
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType

# change select columns to integer type
df = df.select([col(c).cast('int').alias(c) if c in ['MedianFamilyIncome','TractLOWI','TractKids','TractSeniors','TractWhite','TractBlack','TractAsian','TractNHOPI','TractAIAN','TractOMultir','TractHispanic','TractHUNV','TractSNAP']
                else col(c) for c in df.columns])

# change select columns to decimal type
df = df.select([col(c).cast('float').alias(c) if c in ['PovertyRate','lapophalfshare','lalowihalfshare','lakidshalfshare','laseniorshalfshare','lawhitehalfshare','lablackhalfshare','laasianhalfshare','lanhopihalfshare','laaianhalfshare','laomultirhalfshare','lahisphalfshare','lahunvhalfshare','lasnaphalfshare',
                                                       'lapop10share','lalowi10share','lakids10share','laseniors10share','lawhite10share','lawhite10share','lablack10share','laasian10share','lanhopi10share','laaian10share','laomultir10share','lahisp10share','lahunv10share','lasnap10share']
                else col(c) for c in df.columns])

# display data
df.show()


+-----------+-------+--------------+-----+-------+-------+-----------------+--------+--------+-----------------+--------------------+-----------------+------------------+--------+---------------+-----------+------------------+--------+-----------+--------+-------------+---------+----------+----------+------------------+---------+----------+---------+----------+-----------+----------+---------+--------------+----------+---------------+----------+---------------+-------------+------------------+-----------+----------------+-----------+----------------+-----------+----------------+-----------+----------------+----------+---------------+-------------+------------------+----------+---------------+----------+---------------+----------+---------------+------+-----------+-------+------------+-------+------------+----------+---------------+--------+-------------+--------+-------------+--------+-------------+--------+-------------+-------+------------+----------+---------------+-------+---------

In [83]:
freq_count = df.groupBy('OHU2010').agg(count('*').alias('frequency'))
freq_count = freq_count.orderBy(freq_count['OHU2010'])


freq_count.show()

+-------+---------+
|OHU2010|frequency|
+-------+---------+
|      0|      106|
|      1|       86|
|      2|       41|
|      3|       31|
|      4|       24|
|      5|       15|
|      6|        8|
|      7|       10|
|      8|       15|
|      9|        6|
|     10|       11|
|     11|        5|
|     12|       12|
|     13|        7|
|     14|        3|
|     15|        7|
|     16|        3|
|     17|        2|
|     18|        3|
|     19|        8|
+-------+---------+
only showing top 20 rows



In [99]:
# define function to divide two columns
def find_nulls(x, y):
  if x == None:
    return None
  elif y == None:
    return None
  elif y == 0:
    return None
  else:
    return x / y * 100

divide_cols = udf(find_nulls, FloatType())

# calculate percent population of each group per tract
df = df.withColumn('TractLOWI_PCT', round(divide_cols(col('TractLOWI'), col('Pop2010')),2).cast('float')) \
       .withColumn('TractKids_PCT', round(divide_cols(col('TractKids'), col('Pop2010')),2).cast('float')) \
       .withColumn('TractSeniors_PCT', round(divide_cols(col('TractSeniors'), col('Pop2010')),2).cast('float')) \
       .withColumn('TractWhite_PCT', round(divide_cols(col('TractWhite'), col('Pop2010')),2).cast('float')) \
       .withColumn('TractBlack_PCT', round(divide_cols(col('TractBlack'), col('Pop2010')),2).cast('float')) \
       .withColumn('TractAsian_PCT', round(divide_cols(col('TractAsian'), col('Pop2010')),2).cast('float')) \
       .withColumn('TractNHOPI_PCT', round(divide_cols(col('TractNHOPI'), col('Pop2010')),2).cast('float')) \
       .withColumn('TractAIAN_PCT', round(divide_cols(col('TractAIAN'), col('Pop2010')),2).cast('float')) \
       .withColumn('TractOMultir_PCT', round(divide_cols(col('TractOMultir'), col('Pop2010')),2).cast('float')) \
       .withColumn('TractHispanic_PCT', round(divide_cols(col('TractHispanic'), col('Pop2010')),2).cast('float'))
 
# calculate percent household of each group per tract
df = df.withColumn('TractHUNV_PCT', round(divide_cols(col('TractHUNV'), col('OHU2010')),2).cast('float')) \
       .withColumn('TractSNAP_PCT', round(divide_cols(col('TractSNAP'), col('OHU2010')),2).cast('float'))

# display dataframe
df.show()                
                  


+-----------+-------+--------------+-----+-------+-------+-----------------+--------+--------+-----------------+--------------------+-----------------+------------------+--------+---------------+-----------+------------------+--------+-----------+--------+-------------+---------+----------+----------+------------------+---------+----------+---------+----------+-----------+----------+---------+--------------+----------+---------------+----------+---------------+-------------+------------------+-----------+----------------+-----------+----------------+-----------+----------------+-----------+----------------+----------+---------------+-------------+------------------+----------+---------------+----------+---------------+----------+---------------+------+-----------+-------+------------+-------+------------+----------+---------------+--------+-------------+--------+-------------+--------+-------------+--------+-------------+-------+------------+----------+---------------+-------+---------

# 1st DataFrame (71,782 rows)

In [100]:
# create a subset dataframe with selected columns
df1 = df.select(col('CensusTract'),
                                 col('StateFIPS'),
                                 col('State'),
                                 col('County'),
                                 col('Urban'),
                                 col('Pop2010'),
                                 col('OHU2010'),
                                 col('PovertyRate'),
                                 col('MedianFamilyIncome'),
                                 col('LAhalfand10'),
                                 col('TractLOWI'),
                                 col('TractKids'),
                                 col('TractSeniors'),
                                 col('TractWhite'),
                                 col('TractBlack'),
                                 col('TractAsian'),
                                 col('TractNHOPI'),
                                 col('TractAIAN'),
                                 col('TractOMultir'),
                                 col('TractHispanic'),
                                 col('TractHUNV'),
                                 col('TractSNAP'),
                                 col('TractLOWI_PCT'),
                                 col('TractKids_PCT'),
                                 col('TractSeniors_PCT'),
                                 col('TractWhite_PCT'),
                                 col('TractBlack_PCT'),
                                 col('TractAsian_PCT'),
                                 col('TractNHOPI_PCT'),
                                 col('TractAIAN_PCT'),
                                 col('TractOMultir_PCT'),
                                 col('TractHispanic_PCT'),
                                 col('TractHUNV_PCT'),
                                 col('TractSNAP_PCT'))

# display dataframe
df1.show()                                

+-----------+---------+-------+--------------+-----+-------+-------+-----------+------------------+-----------+---------+---------+------------+----------+----------+----------+----------+---------+------------+-------------+---------+---------+-------------+-------------+----------------+--------------+--------------+--------------+--------------+-------------+----------------+-----------------+-------------+-------------+
|CensusTract|StateFIPS|  State|        County|Urban|Pop2010|OHU2010|PovertyRate|MedianFamilyIncome|LAhalfand10|TractLOWI|TractKids|TractSeniors|TractWhite|TractBlack|TractAsian|TractNHOPI|TractAIAN|TractOMultir|TractHispanic|TractHUNV|TractSNAP|TractLOWI_PCT|TractKids_PCT|TractSeniors_PCT|TractWhite_PCT|TractBlack_PCT|TractAsian_PCT|TractNHOPI_PCT|TractAIAN_PCT|TractOMultir_PCT|TractHispanic_PCT|TractHUNV_PCT|TractSNAP_PCT|
+-----------+---------+-------+--------------+-----+-------+-------+-----------+------------------+-----------+---------+---------+------------+

In [101]:
df1.printSchema()

root
 |-- CensusTract: string (nullable = false)
 |-- StateFIPS: string (nullable = false)
 |-- State: string (nullable = true)
 |-- County: string (nullable = true)
 |-- Urban: integer (nullable = true)
 |-- Pop2010: integer (nullable = true)
 |-- OHU2010: integer (nullable = true)
 |-- PovertyRate: float (nullable = true)
 |-- MedianFamilyIncome: integer (nullable = true)
 |-- LAhalfand10: integer (nullable = true)
 |-- TractLOWI: integer (nullable = true)
 |-- TractKids: integer (nullable = true)
 |-- TractSeniors: integer (nullable = true)
 |-- TractWhite: integer (nullable = true)
 |-- TractBlack: integer (nullable = true)
 |-- TractAsian: integer (nullable = true)
 |-- TractNHOPI: integer (nullable = true)
 |-- TractAIAN: integer (nullable = true)
 |-- TractOMultir: integer (nullable = true)
 |-- TractHispanic: integer (nullable = true)
 |-- TractHUNV: integer (nullable = true)
 |-- TractSNAP: integer (nullable = true)
 |-- TractLOWI_PCT: float (nullable = true)
 |-- TractKids_PC

In [106]:
# drop rows with NAN values
df1 = df1.dropna()

# count records
print(f'Total Records: {df1.count()}')

# display dataframe
df1.show()

Total Records: 71782
+-----------+---------+-------+--------------+-----+-------+-------+-----------+------------------+-----------+---------+---------+------------+----------+----------+----------+----------+---------+------------+-------------+---------+---------+-------------+-------------+----------------+--------------+--------------+--------------+--------------+-------------+----------------+-----------------+-------------+-------------+
|CensusTract|StateFIPS|  State|        County|Urban|Pop2010|OHU2010|PovertyRate|MedianFamilyIncome|LAhalfand10|TractLOWI|TractKids|TractSeniors|TractWhite|TractBlack|TractAsian|TractNHOPI|TractAIAN|TractOMultir|TractHispanic|TractHUNV|TractSNAP|TractLOWI_PCT|TractKids_PCT|TractSeniors_PCT|TractWhite_PCT|TractBlack_PCT|TractAsian_PCT|TractNHOPI_PCT|TractAIAN_PCT|TractOMultir_PCT|TractHispanic_PCT|TractHUNV_PCT|TractSNAP_PCT|
+-----------+---------+-------+--------------+-----+-------+-------+-----------+------------------+-----------+---------+--

# 2nd DataFrame (7,708 rows)

In [120]:
df2 = df.drop('LAPOP1_10','LAPOP05_10','LAPOP1_20','LALOWI1_10','LALOWI05_10','LALOWI1_20')
df2.show()

+-----------+-------+--------------+-----+-------+-------+-----------------+--------+--------+-----------------+--------------------+-----------------+------------------+--------+---------------+-----------+------------------+--------+-----------+--------+-------------+---------+----------+----------+------------------+---------+--------------+----------+---------------+----------+---------------+-------------+------------------+-----------+----------------+-----------+----------------+-----------+----------------+-----------+----------------+----------+---------------+-------------+------------------+----------+---------------+----------+---------------+----------+---------------+------+-----------+-------+------------+-------+------------+----------+---------------+--------+-------------+--------+-------------+--------+-------------+--------+-------------+-------+------------+----------+---------------+-------+------------+-------+------------+-------+------------+-------+-----------

# Load to AWS PostgreSQL server

In [116]:
from config import host, database_name, username, password

mode = "append"
jdbc_url=f"jdbc:postgresql://{host}:5432/{database_name}"
config = {"user": username, 
          "password": password, 
          "driver":"org.postgresql.Driver"}

In [119]:
# write df1 to table in RDS
df1.write.jdbc(url=jdbc_url, table='food_access_1', mode=mode, properties=config)
