In [66]:
#!/usr/bin/python3
import json
import time
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, from_json, explode, split, posexplode, col
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, BooleanType, FloatType, IntegerType, ArrayType

In [67]:
"""
We want to create two tables, one table that stores the event of calling the API, 
and one event that appends the results of the API call. 
"""
def aggregate_request_event_schema():
    """
    root
     |-- Accept: string (nullable = true)
     |-- Content-Length: string (nullable = true)
     |-- Content-Type: string (nullable = true)
     |-- Host: string (nullable = true)
     |-- User-Agent: string (nullable = true)
     |-- zipcodes: string (nullable = true)
     |-- event_data: string (nullable = true)
     |-- event_type: string (nullable = true)
     |-- query_timestamp: string (nullable = true)
    """
    return StructType([
        StructField("Accept", StringType(), True),
        StructField("Content-Length", StringType(), True),
        StructField("Content-Type", StringType(), True),
        StructField("Host", StringType(), True),
        StructField("User-Agent", StringType(), True),
        StructField("zipcodes", StringType(), True),
        StructField("event_data", StringType(), True),
        StructField("event_type", StringType(), True),
        StructField("query_timestamp", StringType(), True)
    ])

@udf('boolean')
def is_zipcode_event(event_as_json):
    """
    udf for filtering events
    """
    event = json.loads(event_as_json)
    if event.get("event_type").startswith("get_"):
        return True
    return False

@udf('boolean')
def is_company_event(event_as_json):
    event = json.loads(event_as_json)
    event_type = event.get("event_type")
    return event_type == "get_yelp_data" or event_type == "get_zillow_data" 

@udf('boolean')
def is_zipcode_equal(zipcode1, zipcode2):
    return zipcode1 == zipcode2

In [68]:
    ##We open the spark session
    spark = SparkSession \
        .builder \
        .appName("ExtractEventsJob") \
        .enableHiveSupport() \
        .getOrCreate()    
    

    raw_events = spark \
      .read \
      .format("kafka") \
      .option("kafka.bootstrap.servers", "kafka:29092") \
      .option("subscribe","events") \
      .option("startingOffsets", "earliest") \
      .option("endingOffsets", "latest") \
      .option("multiline", "true") \
      .load()

In [69]:
    zipcode_data = raw_events \
        .filter(is_zipcode_event(raw_events.value.cast('string'))) \
        .select(raw_events.value.cast('string').alias('raw_event'),
                raw_events.timestamp.cast('string'),
                from_json(raw_events.value.cast('string'),
                          aggregate_request_event_schema()).alias('json')) \
        .select('raw_event', 'timestamp', 'json.*')
        
    zipcode_data.show(5)

+--------------------+--------------------+------+--------------+------------+-----------------+---------------+--------+--------------------+---------------+---------------+
|           raw_event|           timestamp|Accept|Content-Length|Content-Type|             Host|     User-Agent|zipcodes|          event_data|     event_type|query_timestamp|
+--------------------+--------------------+------+--------------+------------+-----------------+---------------+--------+--------------------+---------------+---------------+
|{"event_type": "g...|2021-12-06 15:43:...|   */*|          null|        null|user1.comcast.com|ApacheBench/2.3|   92354|[{"id":"pcPeziE1S...|  get_yelp_data|15:43:59.133894|
|{"event_type": "g...|2021-12-06 17:29:...|   */*|          null|        null|user1.comcast.com|ApacheBench/2.3|   92354|[{"id":"pcPeziE1S...|  get_yelp_data|17:29:40.456872|
|{"event_type": "g...|2021-12-06 17:35:...|   */*|          null|        null|user1.comcast.com|ApacheBench/2.3|   92557|[{"z

In [None]:
zipcode_data.write.parquet("/tmp/raw-requests") #Write raw data to Hadoop instance

In [24]:
raw_events \
            .select(from_json(raw_events.value.cast('string'), aggregate_request_event_schema()).alias('json')) \
            .select('json.*') \
            .filter(is_company_event(raw_events.value.cast('string'))).show()

+------+--------------+------------+-----------------+---------------+--------+--------------------+-------------+---------------+
|Accept|Content-Length|Content-Type|             Host|     User-Agent|zipcodes|          event_data|   event_type|query_timestamp|
+------+--------------+------------+-----------------+---------------+--------+--------------------+-------------+---------------+
|   */*|          null|        null|user1.comcast.com|ApacheBench/2.3|   92354|[{"id":"pcPeziE1S...|get_yelp_data|15:43:59.133894|
+------+--------------+------------+-----------------+---------------+--------+--------------------+-------------+---------------+



In [28]:
df = spark.read.option("multiline", "true") \
    .json( \
            raw_events \
            .filter(is_company_event(raw_events.value.cast('string'))) \
            .select(from_json(raw_events.value.cast('string'), aggregate_request_event_schema()).alias('json'))
            .select('json.*')
            .rdd.map(lambda row: row.event_data)
         )
df.show(5)

+--------------------+--------------------+--------------------+---------------------+--------------+---------------+--------------------+--------------------+---------+-------------------+-----------------+-----------------+-------------+----------------+------------------------+--------------+-----------------+--------------------+-----------------+---------------+-----+-----------+------+------------+--------------------+--------------------+
|               alias|          categories|coordinates.latitude|coordinates.longitude| display_phone|       distance|                  id|           image_url|is_closed|  location.address1|location.address2|location.address3|location.city|location.country|location.display_address|location.state|location.zip_code|                name|parent_categories|          phone|price|price_count|rating|review_count|        transactions|                 url|
+--------------------+--------------------+--------------------+---------------------+--------------

In [None]:
df.write.parquet("/tmp/company-data") #Write raw data to Hadoop instance

In [135]:
schema = ArrayType(
                    StructType([ 
                        StructField('id', StringType(), True),
                        StructField('alias', StringType(), True),
                        StructField('name', StringType(), True),
                        StructField('image_url', StringType(), True),
                        StructField('is_closed', BooleanType(), True),
                        StructField('url', StringType(), True),
                        StructField('review_count', IntegerType(), True),
                        StructField('categories', StringType(), True),
                        StructField('rating', FloatType(), True),
                        StructField('transactions', StringType(), True),
                        StructField('price', StringType(), True),
                        StructField('phone', StringType(), True),
                        StructField('display_phone', StringType(), True),
                        StructField('distance', FloatType(), True),
                        StructField('parent_categories', StringType(), True),
                        StructField('coordinates.latitude', FloatType(), True),
                        StructField('coordinates.longitude', FloatType(), True),
                        StructField('location.address1', StringType(), True),
                        StructField('location.address2', StringType(), True),
                        StructField('location.address3', StringType(), True),
                        StructField('location.city', StringType(), True),
                        StructField('location.zip_code', IntegerType(), True),
                        StructField('location.country', StringType(), True),
                        StructField('location.state', StringType(), True),
                        StructField('location.display_address', StringType(), True)
                    ]))

In [136]:
implicit_schema = zipcode_data.select('event_data').schema
implicit_schema

StructType(List(StructField(event_data,StringType,true)))

In [29]:
#zipcode_data.withColumn('json', explode(from_json('event_data', schema))) \
zipcode_data.select('zipcodes','timestamp','event_type',explode(from_json('event_data', schema))).show(5)

NameError: name 'schema' is not defined

In [73]:
    zillow = spark.read.option('header', 'true').csv('file:///w205/w205_project_3_karl_joe_kasha/zillow_2021_11_17.csv')
    zillow.show(5)

+--------+--------------------+--------------------+-------------+-----+-------+--------+---------+----+-------+------+
|    zpid|        full_address|      street_address|         city|state|zipcode|bedrooms|bathrooms|sqft|  price| owner|
+--------+--------------------+--------------------+-------------+-----+-------+--------+---------+----+-------+------+
|17875262|23043 seabrook ln...|   23043 seabrook ln|moreno valley|   ca|  92557|       2|        2| 936| 383900|zillow|
|55030278|12953 bermuda dun...|12953 bermuda dun...|  victorville|   ca|  92395|       3|        2|1415| 362200|zillow|
|69293332|41117 royal sunse...|41117 royal sunse...|lake elsinore|   ca|  92532|       5|        3|2750| 567900|zillow|
|69261278|30684 emperor dr,...|    30684 emperor dr| quail valley|   ca|  92587|       3|        2|2020| 661900|zillow|
|25513735|2693 brand dr, tu...|       2693 brand dr|       tustin|   ca|  92782|       4|        3|2100|1166200|zillow|
+--------+--------------------+---------

In [74]:
zillow.select(zillow.zipcode).show(5)

+-------+
|zipcode|
+-------+
|  92557|
|  92395|
|  92532|
|  92587|
|  92782|
+-------+
only showing top 5 rows



In [77]:
zillow_zipcodes = zipcode_data \
    .filter(zipcode_data.event_type == 'get_zillow_data') \
    .select('zipcodes') \
    .rdd.flatMap(lambda x: x).collect()
zillow_zipcodes

['92557']

In [78]:
zillow.filter(zillow.zipcode.isin(zillow_zipcodes)).show(5)

+--------+--------------------+-----------------+-------------+-----+-------+--------+---------+----+------+------+
|    zpid|        full_address|   street_address|         city|state|zipcode|bedrooms|bathrooms|sqft| price| owner|
+--------+--------------------+-----------------+-------------+-----+-------+--------+---------+----+------+------+
|17875262|23043 seabrook ln...|23043 seabrook ln|moreno valley|   ca|  92557|       2|        2| 936|383900|zillow|
+--------+--------------------+-----------------+-------------+-----+-------+--------+---------+----+------+------+

