In [3]:
# This script processes a dataset available in an S3 bucket conataining information about broadband availability across the United States.            # Specifically, the data shows available broadband speeds,the broadband         # infrastrucure technologies, and the provider names for broadband systems      # across the more than 11 million US census blocks in the United States.
# The data source, file structure including variable names, and other documentation for the broadband dataset is available here: 
# https://opendata.fcc.gov/Wireline/Fixed-Broadband-Deployment-Data-June-2017-Status-V/9r8r-g7ut

# If PYTHONPATH is not set, findspark and findspark.init() will find it on your machine 
import findspark
findspark.init()

import re
import sys
import spark
from pyspark import SparkContext
from pyspark.sql.types import *
import os
import sys
from pyspark.sql.functions import size

from pyspark.sql.functions import substring, length, col, expr


# schemaString = 'something'

# fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]

from pyspark.sql import SparkSession
spark = SparkSession \
        .builder \
        .master('local') \
        .appName("BroadbandScout")\
        .getOrCreate()

sc = SparkContext.getOrCreate()


pathname_input = 's3a://sparkforinsightproject/Fixed_Broadband_Deployment_Data__June__2017_Status_V1.csv'

pathname_output = 's3a://sparkforinsightproject/database_data/sparkdf_broadband_output_2'

sc = SparkContext.getOrCreate()

def etl_broadband(input_data_txt):

    broadband_schema = StructType([
        StructField("Logical Record Number", StringType(), True),
        StructField("Provider ID", StringType(), True),
        StructField("FRN", StringType(), True),
        StructField("Provider Name", StringType(), True),
        StructField("DBA Name", StringType(), True),
        StructField("Holding Company Name", StringType(), True),
        StructField("Holding Company Number", StringType(), True),
        StructField("Holding Company Final", StringType(), True),
        StructField("State", StringType(), True),
        StructField("Census Block FIPS Code", StringType(), True),
        StructField("Technology Code", StringType(), True),
        StructField("Consumer", StringType(), True),
        StructField("Max Advertised Downstream Speed (mbps)", IntegerType(), True),
        StructField("Max Advertised Upstream Speed (mbps)", IntegerType(), True),
        StructField("Business", StringType(), True),
        StructField("Max CIR Downstream Speed (mbps)", IntegerType(), True),
        StructField("Max CIR Upstream Speed (mbps)", IntegerType(), True)
        ])

    df_BROADBAND = spark.read.csv(input_data_txt, quote='"', header=True, sep=',', nullValue='NA', schema=broadband_schema)

    df_BROADBAND = df_BROADBAND\
                               .withColumnRenamed("DBA Name", "dba_name")\
                               .withColumnRenamed("State", "state")\
                               .withColumnRenamed("Census Block FIPS Code", "census_block")\
                               .withColumnRenamed("Technology Code", "technology")\
                               .withColumnRenamed("Max Advertised Downstream Speed (mbps)", "ma_downspeed")\
                               .withColumnRenamed("Max Advertised Upstream Speed (mbps)", "ma_upspeed")\
                               .withColumnRenamed("Max CIR Downstream Speed (mbps)",  "mc_downspeed")\
                               .withColumnRenamed("Max CIR Upstream Speed (mbps)",  "mc_upspeed")


    # This code selects and saves just seven of the 16 columns from the 
    # original file that have some clear potential value for the database.

    df_BROADBAND = df_BROADBAND\
                                       .select(
                                       "dba_name",\
                                       "census_block",\
                                       "state",\
                                       "technology",\
                                       "ma_downspeed",\
                                       "ma_upspeed",\
                                       "mc_downspeed",\
                                       "mc_upspeed")

    return df_BROADBAND

# def main():
#     input_data_txt = sys.argv[1]
#     output_data_txt = sys.argv[2]
#     extract_transform_load_broadband(input_data_txt, output_data_txt)

# if __name__ == '__main__':
#     main()



output_df_broadband = etl_broadband(pathname_input)


output_df_broadband.show()

+--------------------+---------------+-----+----------+------------+----------+------------+----------+
|            dba_name|   census_block|state|technology|ma_downspeed|ma_upspeed|mc_downspeed|mc_upspeed|
+--------------------+---------------+-----+----------+------------+----------+------------+----------+
|   unWired Broadband|060770053034016|   CA|        70|          21|         5|         200|       200|
|                  t6|550590029041059|   WI|        70|           5|         1|        1000|      1000|
|   Veracity Networks|490351031002010|   UT|        30|          12|        12|          12|        12|
|Monmouth Telephon...|340155016082005|   NJ|        50|           0|         0|         100|       100|
|Morris Broadband,...|371139706003009|   NC|        42|         100|         5|         100|         5|
|                null|           null| null|      null|        null|      null|        null|      null|
|Great Basin Inter...|320310012012053|   NV|        70|         

## Create Washington only dataframe

In [85]:

df_BROADBAND_WA_ONLY = output_df_broadband.filter(output_df_broadband["state"] == "WA")
    

df_BROADBAND_WA_ONLY.show()

+--------------------+---------------+-----+----------+------------+----------+------------+----------+
|            dba_name|   census_tract|state|technology|ma_downspeed|ma_upspeed|mc_downspeed|mc_upspeed|
+--------------------+---------------+-----+----------+------------+----------+------------+----------+
| StarTouch Broadband|530770018004053|   WA|        70|          25|        25|        1000|      1000|
| StarTouch Broadband|530330304013022|   WA|        70|           0|         0|        1000|      1000|
| StarTouch Broadband|530439604003069|   WA|        70|          25|        25|        1000|      1000|
| StarTouch Broadband|530610402002017|   WA|        70|          25|        25|        1000|      1000|
| StarTouch Broadband|530730102002089|   WA|        70|          25|        25|        1000|      1000|
| StarTouch Broadband|530750009003056|   WA|        70|          25|        25|        1000|      1000|
| StarTouch Broadband|530330029003005|   WA|        70|         

In [3]:
df_BROADBAND_WA_ONLY.count() # The row count of the WA only dataset is 1,378,559

1378559

# Create new column with census tract 4 digits removed

In [86]:
# Removes last 4 characters of census_tract_column:


df_BROADBAND_WA_ONLY = df_BROADBAND_WA_ONLY.withColumn("census_tract_11",expr("substring(census_tract, 1, 11)"))

df_BROADBAND_WA_ONLY.show()

+--------------------+---------------+-----+----------+------------+----------+------------+----------+---------------+
|            dba_name|   census_tract|state|technology|ma_downspeed|ma_upspeed|mc_downspeed|mc_upspeed|census_tract_11|
+--------------------+---------------+-----+----------+------------+----------+------------+----------+---------------+
| StarTouch Broadband|530770018004053|   WA|        70|          25|        25|        1000|      1000|    53077001800|
| StarTouch Broadband|530330304013022|   WA|        70|           0|         0|        1000|      1000|    53033030401|
| StarTouch Broadband|530439604003069|   WA|        70|          25|        25|        1000|      1000|    53043960400|
| StarTouch Broadband|530610402002017|   WA|        70|          25|        25|        1000|      1000|    53061040200|
| StarTouch Broadband|530730102002089|   WA|        70|          25|        25|        1000|      1000|    53073010200|
| StarTouch Broadband|530750009003056|  

In [87]:
# Transform values in Technology column to make Fiber (50) equl to 1 and non-fiber equal to 0

from pyspark.sql.functions import when

df_BROADBAND_WA_ONLY = df_BROADBAND_WA_ONLY\
        .select("*", when(df_BROADBAND_WA_ONLY.technology == 50, 1)\
        .otherwise(0)\
        .alias("fiber_yn"))\
       
df_BROADBAND_WA_ONLY.show()

+--------------------+---------------+-----+----------+------------+----------+------------+----------+---------------+--------+
|            dba_name|   census_tract|state|technology|ma_downspeed|ma_upspeed|mc_downspeed|mc_upspeed|census_tract_11|fiber_yn|
+--------------------+---------------+-----+----------+------------+----------+------------+----------+---------------+--------+
| StarTouch Broadband|530770018004053|   WA|        70|          25|        25|        1000|      1000|    53077001800|       0|
| StarTouch Broadband|530330304013022|   WA|        70|           0|         0|        1000|      1000|    53033030401|       0|
| StarTouch Broadband|530439604003069|   WA|        70|          25|        25|        1000|      1000|    53043960400|       0|
| StarTouch Broadband|530610402002017|   WA|        70|          25|        25|        1000|      1000|    53061040200|       0|
| StarTouch Broadband|530730102002089|   WA|        70|          25|        25|        1000|     

In [82]:
df_BROADBAND_WA_ONLY.createOrReplaceTempView("broadband_table")


In [None]:
#ALTER TABLE "table_name" DROP "column_name";


broadband_full_table = spark.sql('ALTER TABLE broadband_table DROP ????????
FROM broadband_table')

In [None]:
# df_BROADBAND_WA_ONLY_CT11_Added.orderBy("census_tract", ascending=False).show(40)


In [88]:
mode = "overwrite"
database_name = 'broadband_scoutdb'
hostname = 'ec2-54-186-79-112.us-west-2.compute.amazonaws.com'
url = "jdbc:postgresql://{hostname}:5432/{db}".format(hostname=hostname, db=database_name)
properties = {"user": "postgres","password": "postgres","driver": "org.postgresql.Driver"}
df_BROADBAND_WA_ONLY.write.jdbc(url=url, table="broadband_table_WA_full", properties=properties)

# Census Tract to HPI

In [5]:

    # If PYTHONPATH is not set, findspark and findspark.init() will find it on your machine 
import findspark
findspark.init()

import re
import sys
import spark
from pyspark import SparkContext
from pyspark.sql.types import *
import os
import sys
# schemaString = 'something'

# fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]

from pyspark.sql import SparkSession
spark = SparkSession \
        .builder \
        .master('local') \
        .appName("BroadbandScout")\
        .getOrCreate()

from pyspark.sql.functions import regexp_replace, col

# import pandas as pd

sc = SparkContext.getOrCreate()


pathname_input = 's3a://sparkforinsightproject/HPI_AT_BDL_tract.csv'

pathname_output = 's3a://sparkforinsightproject/database_data/sparkdf_broadband_output_2'

sc = SparkContext.getOrCreate()

def etl_hpi_to_tract(input_data_txt):
    


    hpi_schema = StructType([
        StructField("tract", StringType(), True),
        StructField("state_abbr", StringType(), True),
        StructField("year", StringType(), True),
        StructField("annual_change", StringType(), True),
        StructField("hpi", FloatType(), True),
        StructField("hpi1990", FloatType(), True),
        StructField("hpi2000", FloatType(), True)
        ])


    df_HPI_to_TRACT = spark.read.csv(input_data_txt, header=True, schema=hpi_schema)#\
#                         .withColumn("('tract'",regexp_replace(col("('tract'"), "\(", ""))\
#                         .withColumn("'hpi')",regexp_replace(col("'hpi')"), "\)", "")) 

    df_HPI_to_TRACT = df_HPI_to_TRACT\
                               .withColumnRenamed("tract", "census_tract")\
                               .withColumnRenamed("state_abbr", "state")

    df_HPI_to_TRACT = df_HPI_to_TRACT\
                                       .select(
                                       "census_tract",
                                       "state",
                                       "year",
                                       "hpi")

    return df_HPI_to_TRACT

output_hpi_to_tract = etl_hpi_to_tract(pathname_input)


output_HPI_to_TRACT.show(5)

+------------+-----+----+------+
|census_tract|state|year|   hpi|
+------------+-----+----+------+
| 01001020100|   AL|1998| 100.0|
| 01001020100|   AL|1999| 94.33|
| 01001020100|   AL|2000| 98.45|
| 01001020100|   AL|2001|106.16|
| 01001020100|   AL|2002|113.95|
+------------+-----+----+------+
only showing top 5 rows



In [8]:
df_TRACT_HPI_2017_wa = output_hpi_to_tract.where(col('year').isin({'2017'})).where(col('state').isin({'WA'}))
df_TRACT_HPI_2017_wa.show(10)

+------------+-----+----+------+
|census_tract|state|year|   hpi|
+------------+-----+----+------+
| 53001950100|   WA|2017|121.51|
| 53001950300|   WA|2017|250.55|
| 53001950400|   WA|2017|204.59|
| 53001950500|   WA|2017|218.37|
| 53003960100|   WA|2017|258.93|
| 53003960200|   WA|2017|261.43|
| 53003960300|   WA|2017|172.49|
| 53003960400|   WA|2017|156.04|
| 53003960500|   WA|2017|260.08|
| 53003960600|   WA|2017|240.78|
+------------+-----+----+------+
only showing top 10 rows



In [9]:
left_join_BROADBAND_ZIP_ADDED = df_BROADBAND_WA_ONLY_CT11_Added.join(df_TRACT_HPI_2017_wa, df_BROADBAND_WA_ONLY_CT11_Added.census_tract == df_TRACT_HPI_2017_wa.census_tract)

left_join_BROADBAND_ZIP_ADDED.count()

0

In [77]:
# df_BROADBAND_WA_ONLY_CT11_Added.orderBy("census_tract", ascending=True).show(25)
df_BROADBAND_WA_ONLY_CT11_Added.count()

1378559

In [76]:
mode = "overwrite"
database_name = 'broadband_scoutdb'
hostname = 'ec2-54-186-79-112.us-west-2.compute.amazonaws.com'
url = "jdbc:postgresql://{hostname}:5432/{db}".format(hostname=hostname, db=database_name)
properties = {"user": "postgres","password": "postgres","driver": "org.postgresql.Driver"}
df_BROADBAND_WA_ONLY_CT11_Added.write.jdbc(url=url, table="broadband_table_WA_complete", properties=properties)

In [75]:
df_BROADBAND_WA_ONLY_CT11_Added.orderBy("ma_downspeed", ascending=False).show(25)


+--------------------+---------------+-----+----------+------------+----------+------------+----------+---------------+
|            dba_name|   census_tract|state|technology|ma_downspeed|ma_upspeed|mc_downspeed|mc_upspeed|census_tract_11|
+--------------------+---------------+-----+----------+------------+----------+------------+----------+---------------+
|         CenturyLink|530270015002041|   WA|        50|        1000|      1000|           0|         0|    53027001500|
|   Chelan County PUD|530079604003039|   WA|        50|        1000|       100|           0|         0|    53007960400|
|Douglas County P....|530179508002007|   WA|        30|        1000|      1000|        1000|      1000|    53017950800|
|         CenturyLink|530530604001012|   WA|        50|        1000|      1000|           0|         0|    53053060400|
|           Cable ONE|530039602002004|   WA|        42|        1000|        50|           0|         0|    53003960200|
|         CenturyLink|530110417002002|  

In [67]:
df_BROADBAND_ZIP_WA = df_BROADBAND_WA_ONLY_CT11_Added.join(df_TRACT_HPI_2017_wa, \
    df_BROADBAND_WA_ONLY_CT11_Added.census_tract == df_TRACT_HPI_2017_wa.census_tract, how='left')
df_BROADBAND_ZIP_WA.show(20)

+---------------+-----+----------+------------+----------+------------+----------+---------------+------------+-----+----+----+
|   census_tract|state|technology|ma_downspeed|ma_upspeed|mc_downspeed|mc_upspeed|census_tract_11|census_tract|state|year| hpi|
+---------------+-----+----------+------------+----------+------------+----------+---------------+------------+-----+----+----+
|530019501001055|   WA|        70|          25|        25|        1000|      1000|    53001950100|        null| null|null|null|
|530019501001055|   WA|        60|           5|         1|           0|         0|    53001950100|        null| null|null|null|
|530019501001055|   WA|        70|         100|       100|           0|         0|    53001950100|        null| null|null|null|
|530019501001055|   WA|        60|           0|         0|           0|         0|    53001950100|        null| null|null|null|
|530019501001055|   WA|        70|          15|        15|          15|        15|    53001950100|      

In [18]:
from pyspark.sql.functions import col, count, isnan, lit, sum

def count_not_null(c, nan_as_null=False):
    """Use conversion between boolean and integer
    - False -> 0
    - True ->  1
    """
    pred = col(c).isNotNull() & (~isnan(c) if nan_as_null else lit(True))
    return sum(pred.cast("integer")).alias(c)

# df.agg(*[count_not_null(c) for c in df.columns]).show()

left_join_BROADBAND_ZIP_ADDED.agg(*[count_not_null(c, True) for c in left_join_BROADBAND_ZIP_ADDED.columns]).show()


AnalysisException: u"Reference 'census_tract' is ambiguous, could be: census_tract, census_tract.;"

In [64]:
#!/usr/bin/env python

# Reading the Fannie Mae and Freddie Mac housing datasets, transforming them, reading in the 
# relevant columns to a dataframe, and uniting those datasets into one.

# Data is available here: 

# sc = SparkContext()

import findspark
findspark.init()

import re
import sys
import spark
from pyspark import SparkContext
from pyspark.sql.types import *
import os
import sys

from pyspark.sql import SparkSession
spark = SparkSession \
        .builder \
        .master('local') \
        .appName("BroadbandScout")\
        .getOrCreate()

sc = SparkContext.getOrCreate()

from pyspark.sql.functions import concat, col, lit


#!/usr/bin/env python

# Reading the Fannie Mae and Freddie Mac housing datasets, transforming them, reading in the 
# relevant columns to a dataframe, and uniting those datasets into one.

# Data is available here: 


from pyspark.sql.functions import regexp_replace, col


pathname_FANNIE = 's3a://sparkforinsightproject/fnma_sf2017c_loans.txt'


pathname_FREDDIE = 's3a://sparkforinsightproject/fhlmc_sf2017c_loans.txt'



pathname_output_UNITED = 's3a://sparkforinsightproject/database_data/transformed_fhlmc_fnma_HOUSING_dataset'


def etl_fannie_freddie_data(input_FANNIE_txt, input_FREDDIE_txt):
    
    rdd_FANNIE_data = sc.textFile(input_FANNIE_txt)


    rdd_FANNIE_data = rdd_FANNIE_data.map(lambda x: x.encode('ascii', 'ignore'))\
                            .map(lambda l: l.split())\
                            .map(lambda l: (l[1], l[2] + l[4] + l[5], float(l[6]), int(l[7]), int(l[8]), int(l[11]), int(l[13]), int(l[36]), l[37]))    

    # Column indices wanted: 1- 2+4+5 (Census Tract), 2- 6 ( Minority), 3- 7 (Census Tract Median income), 4- 8 (Local Area Median Income), 
    # 5- 11 (Area Median Family Income), 6- 13 (Acquisition Cost/Original UPB, 7- 36 (Property Type)
    
    
    housing_schema = StructType([
    StructField('_c0', StringType(), True),
    StructField('_c1', StringType(), True),
    StructField("_c2", FloatType(), True),
    StructField("_c3", IntegerType(), True),
    StructField('_c4', IntegerType(), True),
    StructField("_c5", IntegerType(), True),
    StructField("_c6", IntegerType(), True),
    StructField('_c7', StringType(), True),
    StructField('_c8', StringType(), True),
    ])
    
    # Column indices wanted: 2+4+5 (Census Tract), 6 (Percent Minority), 7 (Census Tract Median income) 8 (Local Area Median Income), 11 (Area Median Family Income), 13 (Acquisition Cost/Original UPB, 19, 
    
    df_FANNIE = spark.read.csv(rdd_FANNIE_data, mode='DROPMALFORMED', nullValue='NA', header=False) #, schema=housing_schema)

    
    rdd_FREDDIE_data = sc.textFile(input_FREDDIE_txt)

    rdd_FREDDIE_data = rdd_FREDDIE_data.map(lambda x: x.encode('ascii', 'ignore'))\
                        .map(lambda l: l.split())\
                        .map(lambda l: (l[1], l[2] + l[4] + l[5], float(l[6]), int(l[7]), int(l[8]), int(l[11]), int(l[13]), int(l[36]), l[37]))    
    

    df_FREDDIE = spark.read.csv(rdd_FREDDIE_data, mode='DROPMALFORMED', nullValue='NA', header=False) #, schema=housing_schema)
    
    

    df_FRANNIE_FREDDIE_UNITED = df_FANNIE.union(df_FREDDIE)
    
    df_FRANNIE_FREDDIE_UNITED = df_FRANNIE_FREDDIE_UNITED\
                                       .select(
                                       "_c1",\
                                       "_c2",\
                                       "_c3",\
                                       "_c4",\
                                       "_c5",\
                                       "_c6",\
                                       "_c7")
    

#     # Column indices wanted: 2+4+5 (Census Tract), 6 (Percent Minority), 7 (Census Tract Median income) 8 (Local Area Median Income), 11 (Area Median Family Income), 13 (Acquisition Cost/Original UPB, 19 , 

    
    df_FRANNIE_FREDDIE_UNITED = df_FRANNIE_FREDDIE_UNITED\
                           .withColumnRenamed("_c1", "census_tract")\
                           .withColumnRenamed("_c2", "percent_minority")\
                           .withColumnRenamed("_c3", "ct_median_income")\
                           .withColumnRenamed("_c4", "la_median_income")\
                           .withColumnRenamed("_c5",  "area_media_fam_income")\
                           .withColumnRenamed("_c6",  "acquisition_cost")\
                           .withColumnRenamed("_c7", "property_type")
    
#     df_FRANNIE_FREDDIE_UNITED = df_FRANNIE_FREDDIE_UNITED.withColumn("acquisition_cost", df_FRANNIE_FREDDIE_UNITED["acquisition_cost"].cast(IntegerType()))

#     df_FRANNIE_FREDDIE_UNITED.createOrReplaceTempView("df_housing")
#     df_FRANNIE_FREDDIE_UNITED = spark.sql("select census_tract, cast(percent_minority as float) as percent_minority, cast(ct_median_income as int) as ct_median_income, cast(la_median_income as int) as la_median_income, cast(area_media_fam_income as int) as area_media_fam_income, cast(acquisition_cost as int) as acquisition_cost, cast(property_type cast as int) as property_type from df_housing")
    
    
#     df_FRANNIE_FREDDIE_UNITED = df_FRANNIE_FREDDIE_UNITED.select(concat(col("state_code"), col("county_code"), col("census_code")))


    return df_FRANNIE_FREDDIE_UNITED

    
    
df_united_FANNIE_FREDDIE = etl_fannie_freddie_data(pathname_FANNIE, pathname_FREDDIE)


df_united_FANNIE_FREDDIE.show(5)


+--------------+----------------+----------------+----------------+---------------------+----------------+-------------+
|  census_tract|percent_minority|ct_median_income|la_median_income|area_media_fam_income|acquisition_cost|property_type|
+--------------+----------------+----------------+----------------+---------------------+----------------+-------------+
| '31109002300'|           17.71|           60739|           70200|                72000|           88000|            2|
| '06071009908'|           64.58|           51667|           61507|                63200|          218000|            2|
| '34039036302'|           11.32|          124306|           81054|                89400|          273000|            2|
| '12081001404'|           25.28|           53357|           62814|                65500|          279000|            2|
| '28121020206'|           20.31|           84026|           56700|                63200|          127000|            2|
+--------------+----------------

In [59]:
mode = "overwrite"
database_name = 'broadband_scoutdb'
hostname = 'ec2-54-186-79-112.us-west-2.compute.amazonaws.com'
url = "jdbc:postgresql://{hostname}:5432/{db}".format(hostname=hostname, db=database_name)
properties = {"user": "postgres","password": "postgres","driver": "org.postgresql.Driver"}
df_united_FANNIE_FREDDIE.write.jdbc(url=url, table="housing_table_test", properties=properties)

In [66]:
left_join_BROADBAND_HOUSING = df_BROADBAND_WA_ONLY_CT11_Added.join(df_united_FANNIE_FREDDIE, df_BROADBAND_WA_ONLY_CT11_Added.census_tract == df_united_FANNIE_FREDDIE.census_tract, how='left')
left_join_BROADBAND_HOUSING.show()

+---------------+-----+----------+------------+----------+------------+----------+---------------+------------+----------------+----------------+----------------+---------------------+----------------+-------------+
|   census_tract|state|technology|ma_downspeed|ma_upspeed|mc_downspeed|mc_upspeed|census_tract_11|census_tract|percent_minority|ct_median_income|la_median_income|area_media_fam_income|acquisition_cost|property_type|
+---------------+-----+----------+------------+----------+------------+----------+---------------+------------+----------------+----------------+----------------+---------------------+----------------+-------------+
|530019501001055|   WA|        70|          25|        25|        1000|      1000|    53001950100|        null|            null|            null|            null|                 null|            null|         null|
|530019501001055|   WA|        60|           5|         1|           0|         0|    53001950100|        null|            null|        