
# Glue Studio Notebook
You are now running a **Glue Studio** notebook; before you can start using your notebook you *must* start an interactive session.

## Available Magics
|          Magic              |   Type       |                                                                        Description                                                                        |
|-----------------------------|--------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------|
| %%configure                 |  Dictionary  |  A json-formatted dictionary consisting of all configuration parameters for a session. Each parameter can be specified here or through individual magics. |
| %profile                    |  String      |  Specify a profile in your aws configuration to use as the credentials provider.                                                                          |
| %iam_role                   |  String      |  Specify an IAM role to execute your session with.                                                                                                        |
| %region                     |  String      |  Specify the AWS region in which to initialize a session                                                                                                  |
| %session_id                 |  String      |  Returns the session ID for the running session.                                                                                                          |
| %connections                |  List        |  Specify a comma separated list of connections to use in the session.                                                                                     |
| %additional_python_modules  |  List        |  Comma separated list of pip packages, s3 paths or private pip arguments.                                                                                 |
| %extra_py_files             |  List        |  Comma separated list of additional Python files from S3.                                                                                                 |
| %extra_jars                 |  List        |  Comma separated list of additional Jars to include in the cluster.                                                                                       |
| %number_of_workers          |  Integer     |  The number of workers of a defined worker_type that are allocated when a job runs. worker_type must be set too.                                          |
| %worker_type                |  String      |  Standard, G.1X, *or* G.2X. number_of_workers must be set too. Default is G.1X                                                                            |
| %glue_version               |  String      |  The version of Glue to be used by this session. Currently, the only valid options are 2.0 and 3.0 (eg: %glue_version 2.0)                                |
| %security_config            |  String      |  Define a security configuration to be used with this session.                                                                                            |
| %sql                        |  String      |  Run SQL code. All lines after the initial %%sql magic will be passed as part of the SQL code.                                                            |
| %streaming                  |  String      |  Changes the session type to Glue Streaming.                                                                                                              |
| %etl                        |  String      |   Changes the session type to Glue ETL.                                                                                                                   |
| %status                     |              |  Returns the status of the current Glue session including its duration, configuration and executing user / role.                                          |
| %stop_session               |              |  Stops the current session.                                                                                                                               |
| %list_sessions              |              |  Lists all currently running sessions by name and ID.                                                                                                     |
| %spark_conf                 |  String      |  Specify custom spark configurations for your session. E.g. %spark_conf spark.serializer=org.apache.spark.serializer.KryoSerializer                       |

In [None]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame

sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 0.35 
Authenticating with environment variables and user-defined glue_role_arn: arn:aws:iam::098960867156:role/aws-glue-role-project-combinedRole
Trying to create a Glue session for the kernel.
Worker Type: G.1X
Number of Workers: 5
Session ID: fa36d89c-4adb-458f-bcfc-9af9ab73a96b
Applying the following default arguments:
--glue_kernel_version 0.35
--enable-glue-datacatalog true
Waiting for session fa36d89c-4adb-458f-bcfc-9af9ab73a96b to get into ready status...
Session fa36d89c-4adb-458f-bcfc-9af9ab73a96b has been created




In [1]:
#Parameters
glue_db = "pyspark_tutorial_db"
zillow_tbl = "zillow"
s3_write_path = "s3://aws-glue-bucket-project-combined/output/"




In [2]:
#Read Zillow listings data to Glue dynamic frame
df1 = glueContext.create_dynamic_frame.from_catalog(database = glue_db, table_name = zillow_tbl )
df1.printSchema()

root
|-- zpid: string
|-- id: choice
|    |-- long
|    |-- string
|-- providerlistingid: choice
|    |-- long
|    |-- string
|-- imgsrc: string
|-- hasimage: string
|-- detailurl: string
|-- statustype: string
|-- statustext: string
|-- countrycurrency: string
|-- price: string
|-- unformattedprice: string
|-- address: string
|-- addressstreet: string
|-- addresscity: string
|-- addressstate: string
|-- addresszipcode: string
|-- isundisclosedaddress: string
|-- beds: string
|-- baths: string
|-- areacode: string
|-- latlong: string
|-- iszillowowned: string
|-- variabledata: string
|-- badgeinfo: string
|-- hdpdata: string
|-- issaved: string
|-- hasopenhouse: string
|-- openhousestartdate: string
|-- openhouseenddate: string
|-- openhousedescription: string
|-- isuserclaimingowner: string
|-- isuserconfirmedclaim: string
|-- pgapt: string
|-- sgapt: string
|-- estimated annual cost: string
|-- shouldshowzestimateasprice: string
|-- has3dmodel: string
|-- hasvideo: string
|-- ishome

In [3]:
# Drop unnecessary columns
df1=df1.drop_fields(["id","providerlistingid","countrycurrency", "statustype","isundisclosedaddress", "latlong","iszillowowned","variabledata","badgeinfo","hdpdata", "issaved", "openhousestartdate", "openhouseenddate","isuserclaimingowner","pgapt","sgapt", "shouldshowzestimateasprice", 
"hasvideo", "ishomerec","availabilitydate" ,"list" , "relaxed" ,  "info6string" , "lotareastring","buildername" , "ispropertyresultcdp" , "streetviewmetadataurl" , "streetviewurl","best_deal" , "homeinfo_streetaddress", "homeinfo_price", "isuserconfirmedclaim", "isfeaturedlisting"])

# print the data schema
df1.printSchema()

root
|-- zpid: string
|-- imgsrc: string
|-- hasimage: string
|-- detailurl: string
|-- statustext: string
|-- price: string
|-- unformattedprice: string
|-- address: string
|-- addressstreet: string
|-- addresscity: string
|-- addressstate: string
|-- addresszipcode: string
|-- beds: string
|-- baths: string
|-- areacode: string
|-- hasopenhouse: string
|-- openhousedescription: string
|-- estimated annual cost: string
|-- has3dmodel: string
|-- hasadditionalattributions: string
|-- brokername: string
|-- homeinfo_latitude: string
|-- homeinfo_longitude: string
|-- homeinfo_hometype: string
|-- homeinfo_lotareavalue: string
|-- homeinfo_lotareaunit: string


In [4]:
# Finding the number of records
df1.count()

7873


In [5]:
#Convert dynamic frame to data frame to use standard pyspark functions
data_frame = df1.toDF()




In [6]:
# By using ‘any’, drop a row if it contains NULLs on any columns.
data_frame.na.drop(how="any")

# Validating for nulls in the dartaset and counting the rows post the nulls removal
data_frame.count()

7873


In [7]:
# drop duplicate rows
data_frame.dropDuplicates() 

# Finding the count of number of records post the removal
data_frame.count()

7873


In [8]:
# Removing North Carolina house listings from the dataset as we analyze California listings
data_frame = data_frame.filter(((data_frame.addressstate != 'NC')))

# Finding the count of number of records post the removal
data_frame.count()

7492


####  Few NC records are observed and removed 

In [9]:
from pyspark.sql.functions import when
data_frame=data_frame.withColumn("homeinfo_lotareavalue",when(data_frame.homeinfo_lotareaunit == "acres",data_frame.homeinfo_lotareavalue*43560)
                   .otherwise(data_frame.homeinfo_lotareavalue))




In [10]:
# convert home_AreaUnit having acres to sqft
from pyspark.sql.functions import regexp_replace
data_frame=data_frame.withColumn('homeinfo_lotareaunit', when(data_frame.homeinfo_lotareaunit== 'acres', 'sqft')) 




In [11]:
# check
data_frame.filter(((data_frame.homeinfo_lotareaunit == "acres"))).show(2)

+----+------+--------+---------+----------+-----+----------------+-------+-------------+-----------+------------+--------------+----+-----+--------+------------+--------------------+---------------------+----------+-------------------------+----------+-----------------+------------------+-----------------+---------------------+--------------------+
|zpid|imgsrc|hasimage|detailurl|statustext|price|unformattedprice|address|addressstreet|addresscity|addressstate|addresszipcode|beds|baths|areacode|hasopenhouse|openhousedescription|estimated annual cost|has3dmodel|hasadditionalattributions|brokername|homeinfo_latitude|homeinfo_longitude|homeinfo_hometype|homeinfo_lotareavalue|homeinfo_lotareaunit|
+----+------+--------+---------+----------+-----+----------------+-------+-------------+-----------+------------+--------------+----+-----+--------+------------+--------------------+---------------------+----------+-------------------------+----------+-----------------+------------------+---------

In [12]:
data_frame = data_frame.withColumn("addresszipcode", data_frame.addresszipcode.cast('int'))




In [13]:
data_frame.show(3)

+---------+--------------------+--------+--------------------+-------------------+-----------+----------------+--------------------+-------------+-------------+------------+--------------+----+-----+--------+------------+--------------------+---------------------+----------+-------------------------+--------------------+-----------------+------------------+-----------------+---------------------+--------------------+
|     zpid|              imgsrc|hasimage|           detailurl|         statustext|      price|unformattedprice|             address|addressstreet|  addresscity|addressstate|addresszipcode|beds|baths|areacode|hasopenhouse|openhousedescription|estimated annual cost|has3dmodel|hasadditionalattributions|          brokername|homeinfo_latitude|homeinfo_longitude|homeinfo_hometype|homeinfo_lotareavalue|homeinfo_lotareaunit|
+---------+--------------------+--------+--------------------+-------------------+-----------+----------------+--------------------+-------------+------------

# School Data

In [14]:
#Parameters
glue_db = "pyspark_tutorial_db"
school_tbl = "school"
s3_write_path = "s3://aws-glue-bucket-project-combined/output/"




In [15]:
#Read school data to Glue dynamic frame
df2 = glueContext.create_dynamic_frame.from_catalog(database = glue_db, table_name = school_tbl )
df2.printSchema()

root
|-- school code: string
|-- school name: string
|-- name: string
|-- address: string
|-- city: string
|-- state: string
|-- zip: string
|-- zip4: string
|-- population: choice
|    |-- long
|    |-- string
|-- county: string
|-- countyfips: choice
|    |-- long
|    |-- string
|-- latitude: string
|-- longitude: string
|-- level_: string
|-- enrollment: string
|-- st_grade: string
|-- end_grade: string
|-- districtid: choice
|    |-- long
|    |-- string
|-- ft_teacher: string
|-- statewide rankå(2019): string
|-- statewide rank(2018): choice
|    |-- int
|    |-- string
|-- statewide rank(2017): choice
|    |-- int
|    |-- string
|-- similar students rank(2019): string
|-- similar students rank(2018): choice
|    |-- int
|    |-- string
|-- similar students rank(2017): choice
|    |-- int
|    |-- string
|-- college/career percent prepared (2019): string
|-- college/career percent prepared (2018): choice
|    |-- int
|    |-- string
|-- college/career percent prepared (2017): ch

In [16]:
# drop unnecessary columns
df2 = df2.drop_fields(paths= [ "name","population", "countyfips","districtid", "school type",
 "statewide rank(2018)","statewide rank(2017)", "similar students rank(2018)","similar students rank(2017)", 
 "college/career percent prepared (2018)","college/career percent prepared (2017)","growth(2017-2019)", "statewide percentile(2018)", "statewide percentile(2017)" ])




In [17]:
df2.printSchema()

root
|-- school code: string
|-- school name: string
|-- address: string
|-- city: string
|-- state: string
|-- zip: string
|-- zip4: string
|-- county: string
|-- latitude: string
|-- longitude: string
|-- level_: string
|-- enrollment: string
|-- st_grade: string
|-- end_grade: string
|-- ft_teacher: string
|-- statewide rankå(2019): string
|-- similar students rank(2019): string
|-- college/career percent prepared (2019): string
|-- school district: string
|-- authorizer: string
|-- statewide percentile(2019): string


In [18]:
df2.count()

10577


In [19]:
#Convert dynamic frame to data frame to use standard pyspark functions
data_frame_school = df2.toDF()




In [20]:
# By using ‘any’, drop a row if it contains NULLs on any columns.
data_frame_school.na.drop(how="any")

DataFrame[school code: string, school name: string, address: string, city: string, state: string, zip: string, zip4: string, county: string, latitude: string, longitude: string, level_: string, enrollment: string, st_grade: string, end_grade: string, ft_teacher: string, statewide rankå(2019): string, similar students rank(2019): string, college/career percent prepared (2019): string, school district: string, authorizer: string, statewide percentile(2019): string]


In [21]:
# drop duplicate rows
data_frame_school.dropDuplicates()

DataFrame[school code: string, school name: string, address: string, city: string, state: string, zip: string, zip4: string, county: string, latitude: string, longitude: string, level_: string, enrollment: string, st_grade: string, end_grade: string, ft_teacher: string, statewide rankå(2019): string, similar students rank(2019): string, college/career percent prepared (2019): string, school district: string, authorizer: string, statewide percentile(2019): string]


In [22]:
# Filter with CA States ( 8922 )
data_frame_school = data_frame_school.filter(((data_frame_school.state == "CA")))

data_frame_school.count()

8922


In [23]:
data_frame_school = data_frame_school.withColumn("enrollment", data_frame_school.enrollment.cast('float'))




In [24]:
# Filler columns by excluding enrollment = -999 and #N/A
data_frame_school = data_frame_school.filter(((data_frame_school.enrollment !=-999)))
data_frame_school.count()

8815


In [25]:
# Rename columns for more clarity
data_frame_school = data_frame_school.withColumnRenamed("school code","school_code") \
     .withColumnRenamed("school name","school_name")\
     .withColumnRenamed("address","school_address")\
     .withColumnRenamed("city","school_city")\
     .withColumnRenamed("state","school_state")\
     .withColumnRenamed("zip","school_zipcode")\
     .withColumnRenamed("zip4","school_areacode")\
     .withColumnRenamed("county","school_county")\
     .withColumnRenamed("latitude","school_latitude")\
     .withColumnRenamed("longitude","school_longitude")\
     .withColumnRenamed("level_","school_type")\
     .withColumnRenamed("statewide rankå(2019)","statewide_rank")\
     .withColumnRenamed("similar students rank(2019)","similar_students_rank")\
     .withColumnRenamed("college/career percent prepared (2019)","college_career_percent_prepared")\
     .withColumnRenamed("school district","school_district")\
     .withColumnRenamed("statewide percentile(2019)","statewide_percentile")




In [26]:
data_frame_school.printSchema()

root
 |-- school_code: string (nullable = true)
 |-- school_name: string (nullable = true)
 |-- school_address: string (nullable = true)
 |-- school_city: string (nullable = true)
 |-- school_state: string (nullable = true)
 |-- school_zipcode: string (nullable = true)
 |-- school_areacode: string (nullable = true)
 |-- school_county: string (nullable = true)
 |-- school_latitude: string (nullable = true)
 |-- school_longitude: string (nullable = true)
 |-- school_type: string (nullable = true)
 |-- enrollment: float (nullable = true)
 |-- st_grade: string (nullable = true)
 |-- end_grade: string (nullable = true)
 |-- ft_teacher: string (nullable = true)
 |-- statewide_rank: string (nullable = true)
 |-- similar_students_rank: string (nullable = true)
 |-- college_career_percent_prepared: string (nullable = true)
 |-- school_district: string (nullable = true)
 |-- authorizer: string (nullable = true)
 |-- statewide_percentile: string (nullable = true)


In [27]:
data_frame_school.show(2)

+-----------+--------------------+---------------+-----------+------------+--------------+---------------+-------------+---------------+----------------+-----------+----------+--------+---------+----------+--------------+---------------------+-------------------------------+--------------------+------------------+--------------------+
|school_code|         school_name| school_address|school_city|school_state|school_zipcode|school_areacode|school_county|school_latitude|school_longitude|school_type|enrollment|st_grade|end_grade|ft_teacher|statewide_rank|similar_students_rank|college_career_percent_prepared|     school_district|        authorizer|statewide_percentile|
+-----------+--------------------+---------------+-----------+------------+--------------+---------------+-------------+---------------+----------------+-----------+----------+--------+---------+----------+--------------+---------------------+-------------------------------+--------------------+------------------+-----------

In [28]:
data_frame_school = data_frame_school.repartition(1)




### Aggregate School Data

In [29]:
from pyspark.sql.functions import sum,avg,max,count




In [1]:
# Changing datatypes wherever required

In [30]:
data_frame_school = data_frame_school.withColumn("statewide_rank", data_frame_school.statewide_rank.cast('float'))




In [31]:
data_frame_school = data_frame_school.withColumn("school_zipcode", data_frame_school.school_zipcode.cast('int'))




In [32]:
data_frame_school_aggr=data_frame_school




In [33]:
data_frame_school_aggr = data_frame_school.groupBy("school_zipcode").agg(sum("enrollment").alias("Sum School Enrollment"),\
                                                                                   avg('statewide_rank').alias("Avg School Rank"))




In [34]:
data_frame_school_aggr.show(2)

+--------------+---------------------+------------------+
|school_zipcode|Sum School Enrollment|   Avg School Rank|
+--------------+---------------------+------------------+
|         93560|               2724.0|               1.5|
|         95127|              10918.0|4.3478260869565215|
+--------------+---------------------+------------------+
only showing top 2 rows


In [35]:
avg=data_frame_school_aggr.agg(avg('Avg School Rank')).show()

+--------------------+
|avg(Avg School Rank)|
+--------------------+
|   5.678037946710747|
+--------------------+


In [36]:
data_frame_school_aggr.na.fill(value=5.678).show()

+--------------+---------------------+------------------+
|school_zipcode|Sum School Enrollment|   Avg School Rank|
+--------------+---------------------+------------------+
|         93560|               2724.0|               1.5|
|         95127|              10918.0|4.3478260869565215|
|         95476|               4177.0| 4.111111111111111|
|         94553|               3862.0|               6.4|
|         91352|               9385.0| 3.727272727272727|
|         90019|               4065.0|             4.375|
|         92704|              14937.0|               4.0|
|         91007|               6460.0| 9.833333333333334|
|         93021|               7644.0|              8.25|
|         94303|               2263.0|               3.8|
|         93727|              12855.0| 4.714285714285714|
|         92835|               2022.0| 9.333333333333334|
|         94558|              11150.0|               6.0|
|         92233|               1091.0| 4.666666666666667|
|         9223

### Left Join house data and school data

In [37]:
dataframe_housing_school=data_frame.join(data_frame_school_aggr,data_frame.addresszipcode ==  data_frame_school_aggr.school_zipcode,"left") 




In [38]:
dataframe_housing_school.show(3)

+--------+--------------------+--------+--------------------+--------------+-----------+----------------+--------------------+-----------------+--------------+------------+--------------+----+-----+--------+------------+--------------------+---------------------+----------+-------------------------+--------------------+-----------------+------------------+-----------------+---------------------+--------------------+--------------+---------------------+---------------+
|    zpid|              imgsrc|hasimage|           detailurl|    statustext|      price|unformattedprice|             address|    addressstreet|   addresscity|addressstate|addresszipcode|beds|baths|areacode|hasopenhouse|openhousedescription|estimated annual cost|has3dmodel|hasadditionalattributions|          brokername|homeinfo_latitude|homeinfo_longitude|homeinfo_hometype|homeinfo_lotareavalue|homeinfo_lotareaunit|school_zipcode|Sum School Enrollment|Avg School Rank|
+--------+--------------------+--------+--------------

In [39]:
dataframe_housing_school.printSchema()

root
 |-- zpid: string (nullable = true)
 |-- imgsrc: string (nullable = true)
 |-- hasimage: string (nullable = true)
 |-- detailurl: string (nullable = true)
 |-- statustext: string (nullable = true)
 |-- price: string (nullable = true)
 |-- unformattedprice: string (nullable = true)
 |-- address: string (nullable = true)
 |-- addressstreet: string (nullable = true)
 |-- addresscity: string (nullable = true)
 |-- addressstate: string (nullable = true)
 |-- addresszipcode: integer (nullable = true)
 |-- beds: string (nullable = true)
 |-- baths: string (nullable = true)
 |-- areacode: string (nullable = true)
 |-- hasopenhouse: string (nullable = true)
 |-- openhousedescription: string (nullable = true)
 |-- estimated annual cost: string (nullable = true)
 |-- has3dmodel: string (nullable = true)
 |-- hasadditionalattributions: string (nullable = true)
 |-- brokername: string (nullable = true)
 |-- homeinfo_latitude: string (nullable = true)
 |-- homeinfo_longitude: string (nullable =

In [40]:
data_frame_school.printSchema()

root
 |-- school_code: string (nullable = true)
 |-- school_name: string (nullable = true)
 |-- school_address: string (nullable = true)
 |-- school_city: string (nullable = true)
 |-- school_state: string (nullable = true)
 |-- school_zipcode: integer (nullable = true)
 |-- school_areacode: string (nullable = true)
 |-- school_county: string (nullable = true)
 |-- school_latitude: string (nullable = true)
 |-- school_longitude: string (nullable = true)
 |-- school_type: string (nullable = true)
 |-- enrollment: float (nullable = true)
 |-- st_grade: string (nullable = true)
 |-- end_grade: string (nullable = true)
 |-- ft_teacher: string (nullable = true)
 |-- statewide_rank: float (nullable = true)
 |-- similar_students_rank: string (nullable = true)
 |-- college_career_percent_prepared: string (nullable = true)
 |-- school_district: string (nullable = true)
 |-- authorizer: string (nullable = true)
 |-- statewide_percentile: string (nullable = true)


In [41]:
dataframe_housing_school = dataframe_housing_school.repartition(1)




In [42]:
#Convert back to dynamic frame
dynamic_frame_combined = DynamicFrame.fromDF(dataframe_housing_school, glueContext, "dynamic_frame_write")




In [43]:
#Write data back to S3
glueContext.write_dynamic_frame.from_options(
    frame = dynamic_frame_combined,
    connection_type = "s3",
    connection_options = {
        "path": s3_write_path,
    },
    format = "csv"
)

<awsglue.dynamicframe.DynamicFrame object at 0x7fafbcc774d0>


In [44]:
#Convert back to dynamic frame
dynamic_frame_school = DynamicFrame.fromDF(data_frame_school, glueContext, "dynamic_frame_write")

#Write data back to S3
glueContext.write_dynamic_frame.from_options(
    frame = dynamic_frame_school,
    connection_type = "s3",
    connection_options = {
        "path": s3_write_path,
    },
    format = "csv"
)

<awsglue.dynamicframe.DynamicFrame object at 0x7fafbe2b4310>
