In [1]:
from __future__ import barry_as_FLUFL
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql import Row


In [2]:
spark = SparkSession \
    .builder \
    .master(f"local[*]") \
    .appName("myApp") \
    .config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.12:3.0.1') \
    .getOrCreate()



In [3]:
restaurantRDD = spark.read.format("mongo")\
    .option('uri', f"mongodb://127.0.0.1/triphawk.restaurant") \
    .load() \
    .rdd.cache()

In [7]:
# in case you wanna use it
restaurantRDD.map(lambda x: x.data).flatMap(lambda x: x).map(lambda x: x.id).collect()

2100

In [5]:
def appendRowWithDictionary(a: Row, b: dict):
    c = a.asDict()
    c.update(b)
    return c

In [6]:
def putIdInside(id, array, keyName: str):
    new = []
    for row in array:
        updatedrow = Row(**appendRowWithDictionary(row, {keyName: id.oid}))
        new.append(updatedrow)
    return new 

In [7]:
# reduce the categories to a list of titles
categoriesRDD = restaurantRDD.map(lambda x: x.data).flatMap(lambda x:x).map(lambda x: (x.id, [ i.title for i in x.categories]))


In [8]:
# put city array inside the data
innerRDD = restaurantRDD.map(lambda x: (x._id, x.data)) \
        .map(lambda x: putIdInside(x[0], x[1], 'city_id')).flatMap(lambda x:x)
        # .map(lambda x: (x.id, x.alias))
        # .map(lambda x: (x.id, Row(city_id=x.city_id, coordinates=x.coordinates, image_url=x.image_url, location=x.location, name=x.name, phone=x.phone)))

In [9]:
minBusinessRDD = innerRDD.map(lambda x: (x.id, x.distance)).groupByKey().mapValues(min)
minBusinessRDD.first()

('zJJmydXzNX5vfbeuU_WDOg', 736.4107646831667)

In [10]:
cityInBusinessRDD = restaurantRDD.map(lambda x: (x._id, x.data)).map(lambda x: putIdInside(x[0], x[1], 'city_id')).flatMap(lambda x: x).map(lambda x: (x.id, x))
cityInBusinessRDD.first()

('zJJmydXzNX5vfbeuU_WDOg',
 Row(alias='somorrostro-barcelona', categories=[Row(alias='catalan', title='Catalan'), Row(alias='seafood', title='Seafood')], coordinates=Row(latitude=41.37911, longitude=2.18904), display_phone='+34 932 25 00 10', distance=736.4107646831667, id='zJJmydXzNX5vfbeuU_WDOg', image_url='https://s3-media2.fl.yelpcdn.com/bphoto/GHoJBoGtROMQmJLLb8rleQ/o.jpg', is_closed=False, location=Row(address1='Carrer de Sant Carles, 11', address2='', address3='', city='Barcelona', zip_code='08003', country='ES', state='B', display_address=['Carrer de Sant Carles, 11', '08003 Barcelona', 'Spain']), name='Somorrostro', phone='+34932250010', price='€€', rating=4.5, review_count=368, url='https://www.yelp.com/biz/somorrostro-barcelona?adjust_creative=RqA1ZEM6WDtG1E9qg1EMig&utm_campaign=yelp_api_v3&utm_medium=api_v3_business_search&utm_source=RqA1ZEM6WDtG1E9qg1EMig', city_id='6249a61d355e2a8dc7c00755'))

In [60]:
def updateCategories(row):
    res = [i.title for i in row.categories]
    newRow = Row(**appendRowWithDictionary(row, {"categories": res}))
    return newRow
    

In [64]:
def concatenateAddress(a1, a2, a3):
    return f"{a1} {a2} {a3}".rstrip()

In [65]:
def updateAddress(row):
    loc = row.location
    res = concatenateAddress(loc.address1, loc.address2, loc.address3)
    newRow = Row(**appendRowWithDictionary(row.location, {"address": res}))
    return newRow

In [68]:
def updateLocation(row, locationRow):
    return Row(**appendRowWithDictionary(row, {'location': locationRow}))

In [103]:
def removeDataColumns(row, columnsFirst: str, columnsLocation: str):
    dict = row.asDict(True) #recursive
    for col in columnsFirst:
        dict.pop(col)
    
    for col in columnsLocation:
        dict['location'].pop(col)
    return Row(**dict)

In [106]:
# find the right businesses for each neighborhood
businessByDistanceRDD = minBusinessRDD.join(cityInBusinessRDD)\
    .filter(lambda x: x[1][0] == x[1][1].distance)\
    .map(lambda x: (x[0], x[1][1]))\
    .map(lambda x: (x[0], updateCategories(x[1]))) \
    .map(lambda x: (x[0], updateLocation(x[1], updateAddress(x[1])) )) \
    .map(lambda x: (x[0], removeDataColumns(x[1], ['alias', 'display_phone', 'distance', 'is_closed'], ['address1', 'address2', 'address3', 'state', 'display_address' ]) )) 

businessByDistanceRDD.first()

('zJJmydXzNX5vfbeuU_WDOg',
 Row(categories=['Catalan', 'Seafood'], coordinates={'latitude': 41.37911, 'longitude': 2.18904}, id='zJJmydXzNX5vfbeuU_WDOg', image_url='https://s3-media2.fl.yelpcdn.com/bphoto/GHoJBoGtROMQmJLLb8rleQ/o.jpg', location={'city': 'Barcelona', 'zip_code': '08003', 'country': 'ES', 'address': 'Carrer de Sant Carles, 11'}, name='Somorrostro', phone='+34932250010', price='€€', rating=4.5, review_count=368, url='https://www.yelp.com/biz/somorrostro-barcelona?adjust_creative=RqA1ZEM6WDtG1E9qg1EMig&utm_campaign=yelp_api_v3&utm_medium=api_v3_business_search&utm_source=RqA1ZEM6WDtG1E9qg1EMig', city_id='6249a61d355e2a8dc7c00755'))

In [122]:
businessesRDD = businessByDistance.map(lambda x: (x[1].city_id, x[1])).groupByKey().mapValues(list)
# map(lambda x: (x[1][1], x[1][1])).groupByKey().mapValues(list)
# businessesRDD.first()
businessesRDD.count()

21

In [129]:
# get the first level in JSON with the id of the city
firstLevelRDD = restaurantRDD.map(lambda x: (x._id.oid, (x.location, x.type)))
# newrdd.count()
firstLevelRDD.first()

('6249a61d355e2a8dc7c00755', ('Barceloneta', 'restaurant'))

In [144]:
# treba da gi spojam mesto i restoram
finalRDD = businessesRDD.join(firstLevelRDD).map(lambda x: Row(_id=x[0],location=x[1][1][0], type=x[1][1][1], data=x[1][0]))

In [146]:
finalDF = finalRDD.toDF()
finalDF.printSchema()

root
 |-- _id: string (nullable = true)
 |-- location: string (nullable = true)
 |-- type: string (nullable = true)
 |-- data: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- categories: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- coordinates: map (nullable = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: double (valueContainsNull = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- image_url: string (nullable = true)
 |    |    |-- location: map (nullable = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: string (valueContainsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- phone: string (nullable = true)
 |    |    |-- price: string (nullable = true)
 |    |    |-- rating: double (nullable = true)
 |    |    |-- review_count: long (nullable = true)
 |    |    |-- url: string (nullable = true)
 |    |    |-- city_id: strin

In [148]:
finalDF.write\
    .format("mongo")\
    .mode("append")\
    .option('uri', f"mongodb://127.0.0.1/triphawk.restaurant_new").save()