In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import json
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, DataFrameWriter, DataFrameReader
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [3]:
from sqlalchemy import Table, Column, Integer, String, MetaData, create_engine
from geoalchemy2 import Geometry

In [4]:
def get_config():
    with open("../config.json", "r") as f:
        jsonstr = f.read()
        conf = json.loads(jsonstr)
        return conf

In [5]:
def get_spark_conf(config):
    '''set config'''
    conf = SparkConf()
    conf.setAppName('yelp')
    conf.set('spark.master', config["spark"]["master_url"])
    return conf

In [6]:
def get_pg_props(config):
    '''set psql properties'''
    props = {
        "user": config["postgres"]["user"],
        "password": config["postgres"]["password"],
        "driver": "org.postgresql.Driver",
    }
    return props

In [7]:
env = "development"

In [8]:
def getdf(sql_context, config):
    '''filter yelp dataset'''
    df = sql_context.read.json(config["yelp"]["s3"])
    return df

In [9]:
config = get_config()
spark_conf = get_spark_conf(config)
sc = SparkContext(conf=spark_conf)

In [10]:
sql_context = SQLContext(sc)

In [11]:
raw_df = getdf(sql_context, config)

In [12]:
def get_category_set(raw_df):
    def split_categories(row):
        if row.categories != None:
            return row.categories.split(", ")
        return []
    
    raw_categories = raw_df.rdd.map(split_categories).collect()
    category_set = set()
    for row in raw_categories:
        category_set.update(row)
    return category_set

In [13]:
all_categories = list(get_category_set(raw_df))
print(len(all_categories))

1305


In [14]:
def write_to_pg(df, table, config):
    '''write to psql'''
    url = config["postgres"][env]["jdbc"]
    props = get_pg_props(config)
    df.write.jdbc(url=url, table=table, mode='overwrite', properties=props)

In [None]:
def write_category_df_to_pg(all_categories, config):
    schema = StructType([
        StructField("id", IntegerType(), False),
        StructField("name", StringType(), False)
    ])
    df = sql_context.createDataFrame(zip(range(len(all_categories)), all_categories), schema)
    write_to_pg(df, "categories", config)
    return df

In [None]:
category_df = write_category_df_to_pg(all_categories, config)

In [15]:
def write_yelp_df_to_pg(raw_df, config):
    selected_columns = ["id", "name", "latitude", "longitude", "stars", "review_count", "address", "city", "state"]
    df = raw_df.withColumn("id", monotonically_increasing_id())
    write_to_pg(df[selected_columns], "yelp", config)
    return df[selected_columns + ["categories"]]

In [None]:
def write_yelp2category_to_pg(yelp_df, category_df, config):
    def categories_to_ids(row):
        categories = []
        if row["categories"] != None:
            categories = row["categories"].split(", ")
        return zip([row.id] * len(categories), [category_dict[cat] for cat in categories])    
    
    yelp_df = write_yelp_df_to_pg(raw_df, config)
    category_dict = {}
    for row in category_df.collect():
        category_dict[row.name] = row.id
        
    yelp2cat_rdd = yelp_df.rdd.flatMap(categories_to_ids)
    schema = StructType([
        StructField("yelp_id", LongType(), False),
        StructField("category_id", IntegerType(), False)
    ])
    yelp2cat_df = sql_context.createDataFrame(yelp2cat_rdd, schema)
    
    write_to_pg(yelp2cat_df, "yelp2category", config)
    return yelp2cat_df

In [16]:
yelp_df = write_yelp_df_to_pg(raw_df, config)

In [17]:
write_yelp2category_to_pg(yelp_df, category_df, config)

NameError: name 'write_yelp2category_to_pg' is not defined

In [43]:
def write_yelp_geo_to_pg(yelp_df):
    def insertion_dict(row):
        if row.longitude != None and row.latitude != None:
            return {"yelp_id": row.id, "location": f'POINT({row.longitude} {row.latitude})'}
        return None
    
    metadata = MetaData()
    geo_table = Table('yelp_geo', metadata,
        Column('yelp_id', Integer, primary_key=True),
        Column('location', Geometry('POINT'))
    )
    engine = create_engine(config["postgres"][env]["url"])
    conn = engine.connect()

    conn.execute(geo_table.insert(),
                 yelp_df.rdd.map(insertion_dict).filter(lambda d: d != None).collect())

In [44]:
write_yelp_geo_to_pg(yelp_df)