*The following should read Python3.7 and Spark 2.4.0. If not, change the kernel to spark_2_4_0*

If no such kernel exists, open a **bash** terminal (the default is plain sh). Then run `setup.sh`

In [1]:
!python --version
!pyspark --version

Python 3.7.12
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.0
      /_/
                        
Using Scala version 2.11.12, OpenJDK 64-Bit Server VM, 1.8.0_312
Branch 
Compiled by user  on 2018-10-29T06:22:05Z
Revision 
Url 
Type --help for more information.


# Export Database
The purpose of this script is to dump the contents of the housing-db database.

## Options
1. `DUMP_PARQUET` Dumps the database in Parquet form. Perfect for importing into another program like R
2. `DUMP_EXCEL` Dumps the database in Excel form. Perfect for human analysis after the fac
3. `UPLOAD_DEST` Uploads either the parquet, excel, or both files to S3 if `UPLOAD_DEST` is set to a string, or does nothing if set to the empty string

In [2]:
# set settings here
BUCKET = "tcceval-data"
DATABASE_CACHE_FOLDER = "_db_cache"
DUMP_PARQUET_FOLDER = "craigslist_parquet"
DUMP_EXCEL_FOLDER = "craigslist_excel"

In [3]:
# Establishes Spark session
ADDITIONAL_ARTIFACTS = [
    # Apache Sedona handles spatial queries
    'org.apache.sedona:sedona-python-adapter-2.4_2.11:1.2.0-incubating',
    'org.apache.sedona:sedona-viz-2.4_2.11:1.2.0-incubating',
    'org.datasyslab:geotools-wrapper:1.1.0-25.2',
    # JDBC connector for MySQL
    'mysql:mysql-connector-java:8.0.30'
]

import sagemaker_pyspark
from pyspark.sql import SparkSession
from sedona.register import SedonaRegistrator
from sedona.utils import SedonaKryoRegistrator, KryoSerializer

classpath = ":".join(sagemaker_pyspark.classpath_jars())
print(classpath)
spark = SparkSession.builder \
    .config("spark.driver.extraClassPath", classpath) \
    .config("spark.executor.extraClassPath", classpath) \
    .config("spark.serializer", KryoSerializer.getName) \
    .config("spark.kryo.registrator", SedonaKryoRegistrator.getName) \
    .config('spark.jars.packages',
           ','.join(ADDITIONAL_ARTIFACTS)) \
    .getOrCreate()
SedonaRegistrator.registerAll(spark)
# suppresses a bunch of AbortableS3Stream warnings
spark.sparkContext.setLogLevel("ERROR")

/home/ec2-user/bash/envs/spark_2_4_0/lib/python3.7/site-packages/sagemaker_pyspark/jars/aws-java-sdk-core-1.11.835.jar:/home/ec2-user/bash/envs/spark_2_4_0/lib/python3.7/site-packages/sagemaker_pyspark/jars/aws-java-sdk-kms-1.11.835.jar:/home/ec2-user/bash/envs/spark_2_4_0/lib/python3.7/site-packages/sagemaker_pyspark/jars/aws-java-sdk-s3-1.11.835.jar:/home/ec2-user/bash/envs/spark_2_4_0/lib/python3.7/site-packages/sagemaker_pyspark/jars/aws-java-sdk-sagemaker-1.11.835.jar:/home/ec2-user/bash/envs/spark_2_4_0/lib/python3.7/site-packages/sagemaker_pyspark/jars/aws-java-sdk-sagemakerruntime-1.11.835.jar:/home/ec2-user/bash/envs/spark_2_4_0/lib/python3.7/site-packages/sagemaker_pyspark/jars/aws-java-sdk-sts-1.11.835.jar:/home/ec2-user/bash/envs/spark_2_4_0/lib/python3.7/site-packages/sagemaker_pyspark/jars/hadoop-annotations-2.8.1.jar:/home/ec2-user/bash/envs/spark_2_4_0/lib/python3.7/site-packages/sagemaker_pyspark/jars/hadoop-auth-2.8.1.jar:/home/ec2-user/bash/envs/spark_2_4_0/lib/pytho

Ivy Default Cache set to: /home/ec2-user/.ivy2/cache
The jars for the packages stored in: /home/ec2-user/.ivy2/jars
:: loading settings :: url = jar:file:/home/ec2-user/bash/envs/spark_2_4_0/lib/python3.7/site-packages/pyspark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.apache.sedona#sedona-python-adapter-2.4_2.11 added as a dependency
org.apache.sedona#sedona-viz-2.4_2.11 added as a dependency
org.datasyslab#geotools-wrapper added as a dependency
mysql#mysql-connector-java added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-0bd2c31e-9edf-4149-9afd-6ca8442897e3;1.0
	confs: [default]
	found org.apache.sedona#sedona-python-adapter-2.4_2.11;1.2.0-incubating in central
	found org.locationtech.jts#jts-core;1.18.0 in central
	found org.wololo#jts2geojson;0.16.1 in central
	found com.fasterxml.jackson.core#jackson-databind;2.12.2 in central
	found com.fasterxml.jackson.core#jackson-annotations;2.12.2 in central
	found com.fasterxml.

2022-08-30 22:13:40 WARN  NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
                                                                                

In [4]:
# Establishes geojson database
import os
import geopandas as gpd
import pandas as pd

def load_tract(name):
    df = gpd.read_file(f'tracts/{name}.geojson')
    df['region'] = name
    for col in df.columns:
        if col.lower() == 'group':
            df['group'] = df[col].astype(int)
        elif col.lower() == 'geoid':
            df['geoid'] = df[col].astype(str)
    
    return df[['group', 'region', 'geometry', 'geoid']]

tracts_gpd = gpd.GeoDataFrame(pd.concat(map(load_tract, ['fresno', 'la', 'ontario', 'pacoima']), ignore_index=True))
tracts_gpd.info()
# upload dataframe to spark
tracts = spark.createDataFrame(tracts_gpd)
tracts.createOrReplaceTempView("tracts")
tracts.printSchema()
tracts.show()

ERROR 1: PROJ: proj_create_from_database: Open of /home/ec2-user/bash/envs/spark_2_4_0/share/proj failed


<class 'geopandas.geodataframe.GeoDataFrame'>
RangeIndex: 158 entries, 0 to 157
Data columns (total 4 columns):
 #   Column    Non-Null Count  Dtype   
---  ------    --------------  -----   
 0   group     158 non-null    int64   
 1   region    158 non-null    object  
 2   geometry  158 non-null    geometry
 3   geoid     158 non-null    object  
dtypes: geometry(1), int64(1), object(2)
memory usage: 5.1+ KB
root
 |-- group: long (nullable = true)
 |-- region: string (nullable = true)
 |-- geometry: geometry (nullable = true)
 |-- geoid: string (nullable = true)

+-----+------+--------------------+-----------+
|group|region|            geometry|      geoid|
+-----+------+--------------------+-----------+
|    0|fresno|POLYGON ((-119.74...|06019001202|
|    0|fresno|POLYGON ((-119.75...|06019001304|
|    0|fresno|POLYGON ((-119.73...|06019001407|
|    0|fresno|POLYGON ((-119.75...|06019002800|
|    0|fresno|POLYGON ((-119.75...|06019003202|
|    0|fresno|POLYGON ((-119.86...|06019003

In [5]:
def try_parse(str, meth):
    if str is None:
        return 0
    str = str.strip()
    str = str.replace(',', '')
    if len(str) == 0:
        return 0
    try:
        return meth(str)
    except ValueError:
        # print("Could not turn into number: "+str)
        # i'm getting a bunch of 'buttonclass'
        return 0

def update_schema(df):
    # drop useless columns
    df = df.drop(columns=['begts', 'endts', 'region'])
    # perform casts
    df['pid'] = df['pid'].map(lambda x: try_parse(x, int))
    df['repostid'] = df['repostid'].astype(str)
    df['lat'] = df['lat'].map(lambda x: try_parse(x, float))
    df['lng'] = df['lng'].map(lambda x: try_parse(x, float))
    df['accuracy'] = df['accuracy'].map(lambda x: try_parse(x, float))
    df['price'] = df['price'].map(lambda x: try_parse(x, float))
    df['sqft'] = df['sqft'].map(lambda x: try_parse(x, int))
    df['beds'] = df['beds'].map(lambda x: try_parse(x, int))
    df['dt'] = pd.to_datetime(df['dt'])
    return df

In [6]:
# Connects to database using standard python. Using Spark leads to OOM's
# dumps entire database into filesystem

from sqlalchemy import create_engine
import pandas as pd
import shutil
import time

TIMEOUT = 100*60*60
engine = create_engine('mysql+pymysql://luskincenter:tccproject@housing-site-db.cxxl1so9sozw.us-west-1.rds.amazonaws.com:3306/housing_site_db', echo=True, 
                       connect_args={'connect_timeout': TIMEOUT})
table_status = pd.read_sql_query("SHOW TABLE STATUS LIKE 'craigslist_table'", engine)
print(table_status)
total_rows = table_status['Rows'][0]
print("TOTAL ROW COUNT (guess)", total_rows)

2022-08-30 22:13:49,909 INFO sqlalchemy.engine.Engine SELECT DATABASE()
2022-08-30 22:13:49,910 INFO sqlalchemy.engine.Engine [raw sql] {}
2022-08-30 22:13:49,918 INFO sqlalchemy.engine.Engine SELECT @@sql_mode
2022-08-30 22:13:49,918 INFO sqlalchemy.engine.Engine [raw sql] {}
2022-08-30 22:13:49,921 INFO sqlalchemy.engine.Engine SELECT @@lower_case_table_names
2022-08-30 22:13:49,922 INFO sqlalchemy.engine.Engine [raw sql] {}
2022-08-30 22:13:49,924 INFO sqlalchemy.engine.Engine SHOW TABLE STATUS LIKE 'craigslist_table'
2022-08-30 22:13:49,925 INFO sqlalchemy.engine.Engine [raw sql] {}
               Name  Engine  Version Row_format      Rows  Avg_row_length  \
0  craigslist_table  InnoDB       10    Dynamic  10107718            1842   

   Data_length  Max_data_length  Index_length  Data_free Auto_increment  \
0  18620596224                0             0    5242880           None   

          Create_time         Update_time Check_time          Collation  \
0 2019-10-20 18:24:52 202

In [None]:
%%time
import os
import uuid
import shutil
from tqdm.autonotebook import tqdm
from s3fs import S3FileSystem

# delete existing database cache if found
s3fs = S3FileSystem()
try:
    s3fs.rm(f'{BUCKET}/{DATABASE_CACHE_FOLDER}/', recursive=True)
except:
    pass

rows_written = 0
with engine.connect().execution_options(stream_results=True) as conn, tqdm(total=total_rows) as pbar:
    for chunk in pd.read_sql_query("SELECT * FROM craigslist_table", conn, chunksize=32*1024):
        chunk = update_schema(chunk)
        rows_written += len(chunk)
        chunk.to_parquet(f's3://{BUCKET}/{DATABASE_CACHE_FOLDER}/part{rows_written}.snappy.parquet', compression='snappy')
        
        pbar.update(len(chunk))
# print(f'TOTAL EXECUTION TIME: {(time.monotonic() - start_time) / 60}m')

  after removing the cwd from sys.path.
  0%|          | 0/10107718 [00:00<?, ?it/s]

2022-08-30 22:13:52,117 INFO sqlalchemy.engine.Engine SELECT * FROM craigslist_table
2022-08-30 22:13:52,119 INFO sqlalchemy.engine.Engine [raw sql] {}


 91%|█████████▏| 9240576/10107718 [12:35<01:05, 13187.69it/s]

In [None]:
%%time
# load in the craigslist data into Spark
craigslist = spark.read.option('mergeSchema', True).parquet("s3a://tcceval-data/_db_cache/")
craigslist.createOrReplaceTempView("craigslist")
# REMINDER: IT'S LONGITUDE, LATITUDE
craigslist = spark.sql("SELECT *, ST_POINT(lng, lat) as geometry FROM craigslist")
craigslist.printSchema()
craigslist.show()

In [None]:
# define a keyword search regex for detecting ADU's
import re
adu_keywords = [
    'accessory dwelling',
    'accessory dwelling unit',
    'ADU',
    'ancillary unit',
    'carriage house'
    'cottage',
    'granny flat',
    'granny unit',
    'in-law',
    'in-law suite',
    'in-law unit',
    'Converted garage',
    'Garage conversion',
    'Garage for rent',
    'Casita',
    'Renta garage',
    'Garage convertido',
    'Renta cuarto'
]
def transform_keywords(kwd):
    # escapes each keyword and adds a whitespace marker to the end
    return re.escape(kwd)+'\s'
# make case insensitive and union all the keywords
ADU_REGEX = '(?i)(' + '|'.join(map(transform_keywords, adu_keywords)) + ')'
print(ADU_REGEX)

In [None]:
# create a new dataframe, "properties", that has all the columns of interest
from pyspark.sql.window import Window
import pyspark.sql.functions as F

# tracts << craigslist, so we broadcast tracts for the join
# drop geometry columns since they won't serialize well    
properties = craigslist.alias('craigslist').join(F.broadcast(tracts).alias('tracts'), F.expr("ST_CONTAINS(tracts.geometry, craigslist.geometry)")) \
    .drop('craigslist.geometry') \
    .drop('geometry')
properties = properties \
    .where(F.expr('craigslist.price > 0 AND craigslist.sqft > 0'))

# we want to use either the URL defined shelter type, or the adu type if our regex gets a match
properties = properties.withColumn('url_type', F.regexp_extract('url', r'.*\/(\w+)\/d\/.*', 1))
properties = properties.withColumn('is_adu', F.col('posttext').rlike(ADU_REGEX))
properties = properties.withColumn("type",
  F.when(F.col("is_adu") == F.lit(True), F.lit('adu')) \
    .otherwise(F.col("url_type"))
)

# counter landlord spam
properties = properties.dropDuplicates(['pid'])
properties = properties.dropDuplicates(['price', 'address', 'lat', 'lng'])

# calculate percentiles over region and type for use in removing outliers
properties = properties.withColumn('pct_price', F.percent_rank().over(Window.partitionBy('region', 'type').orderBy('price')))
properties = properties.withColumn('pct_sqft', F.percent_rank().over(Window.partitionBy('region', 'type').orderBy('sqft')))

properties.createOrReplaceTempView("properties")
properties.explain()
properties.printSchema()

In [None]:
%%time
if DUMP_PARQUET_FOLDER is None:
    exit()
# delete DUMP_PARQUET_FOLDER if exists
try:
    s3fs.rm(f'{BUCKET}/{DUMP_PARQUET_FOLDER}', recursive=True)
except:
    pass

properties = properties.repartition(12)
properties.write \
    .option("compression", "gzip") \
    .parquet(f's3a://{BUCKET}/{DUMP_PARQUET_FOLDER}/')

In [None]:
%%time
if DUMP_EXCEL_FOLDER is None:
    exit()
# delete DUMP_PARQUET_FOLDER if exists
try:
    s3fs.rm(f'{BUCKET}/{DUMP_EXCEL_FOLDER}', recursive=True)
except:
    pass

SHELTER_TYPES_ALIAS = {
    'reo': 'Real_Estate_Owner',
    'sub': 'Sublet_Temporary',
    'apa': 'Apartment',
    'reb': 'Real_Estate_Broker',
    'roo': 'Rooms_Shares',
    'vac': 'Vacation_Rentals',
    'adu': 'Auxiliary_Dwelling_Unit'
}
prop_df = properties.repartition("region", "type")
prop_df.cache()
for region in tracts_gpd['region'].unique():
    for shelter_type in SHELTER_TYPES_ALIAS.keys():
        df = prop_df.filter( (prop_df['region'] == region) & (prop_df['type'] == shelter_type) )
        df = df.toPandas()
        df.to_excel(f"s3://{BUCKET}/{DUMP_EXCEL_FOLDER}/{region}-{SHELTER_TYPES_ALIAS[shelter_type]}.xlsx")
prop_df.unpersist()

2022-08-30 22:12:38
Full thread dump OpenJDK 64-Bit Server VM (25.312-b07 mixed mode):

"Executor task launch worker for task 12748" #831 daemon prio=5 os_prio=0 tid=0x00007f60dc0b0000 nid=0x28b waiting on condition [0x00007f60a2107000]
   java.lang.Thread.State: TIMED_WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for  <0x00000000c047d448> (a java.util.concurrent.SynchronousQueue$TransferStack)
	at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
	at java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:460)
	at java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:362)
	at java.util.concurrent.SynchronousQueue.poll(SynchronousQueue.java:941)
	at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1073)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExe