# Apache Spark

we use Spark to explore [Safegraph data](https://www.safegraph.com/covid-19-data-consortium) to better understand how NYC response to the COVID-19 pandemic. Similarly, we will be looking at the [Core Places](https://docs.safegraph.com/v4.0/docs#section-core-places) and the [Weekly Pattern](https://docs.safegraph.com/v4.0/docs/places-schema#section-patterns) data sets to answer the following two inquiries:


1.   How many restaurants in NYC were closed right when the city shut down on March 17, 2020, and how many were closed by April 1, 2020?

2.   For those that were open on/after April 1, 2020 in [1], which ones still received a high volume of visits (in any day on/after April 1)? What were the median dwelling time at each  establishment in the first week of March (3/2-3/9) and in the first week of April (3/30-4/6)?

### Definitions

* *NYC*: a restaurant is considered to be in NYC if its city is `'New York'`, `'Brooklyn'`, `'Queens'`, `'Bronx'`, or `'Staten Island'`.

* *Open*: a restaurant is considered open for a day if it has 1 or more visitors reported on that day in the *Weekly Pattern* data set.

* *High Volume of Visits*: a restaurant is considered to receive a high volume of visitors if it has 50 or more visits on a day reported on that day in the *Weekly Pattern* data set.

* *Median Dwelling Time*: though the *Weekly Pattern* report the median dwelling time in the **`median_dwell`** field, we would like to exclude those staying more than 4 hours (mostly employees) when calculating our median dwelling time. Thus, the *median dwelling time* should be computed from the **`bucketed_dwell_times`** without the **>240** bucket. The median dwelling time should only have one of the values `'<5'`, `'5-10'`, `'11-20'`, `'21-60'`, `'61-120'`, `'121-240'`, or `'N/A'` if it could not be determined.

 ## Requirements

* You have to use Spark for this assignment.

* Our data sets (`core_poi_ny.csv` and `nyc_restaurant_pattern.csv`) are assumed to be on HDFS, and could only be accessed using Spark (either as a Spark's DataFrame or an RDD).

* You are not allowed to collect the raw data to the notebook and process them without using Spark. However, it is okay to collect intermediate data for processing. Just try to collect as little as possible.

## Environment Setup

In [1]:
!gdown --id 1ZK8ql8arn0pkIJZIfknNscKXn85L9ZXX -O core_poi_ny.csv
!gdown --id 1NeXqsAeIJ8zukHt5cR2s19beDoz2Xw5d -O nyc_restaurant_pattern.csv
!pip install pyspark

import csv
import datetime
import json
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import DateType, IntegerType, MapType, StringType

sc = pyspark.SparkContext()
spark = SparkSession(sc)

Downloading...
From: https://drive.google.com/uc?id=1ZK8ql8arn0pkIJZIfknNscKXn85L9ZXX
To: /content/core_poi_ny.csv
100% 95.6M/95.6M [00:00<00:00, 103MB/s] 
Downloading...
From: https://drive.google.com/uc?id=1NeXqsAeIJ8zukHt5cR2s19beDoz2Xw5d
To: /content/nyc_restaurant_pattern.csv
100% 101M/101M [00:00<00:00, 122MB/s]  
Collecting pyspark
  Downloading pyspark-3.2.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 48 kB/s 
[?25hCollecting py4j==0.10.9.2
  Downloading py4j-0.10.9.2-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 81.3 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.0-py2.py3-none-any.whl size=281805912 sha256=19e2540601af83e1e2d88dc271ee574d699a57dba2771a2a460db17ecb0c3f3a
  Stored in directory: /root/.cache/pip/wheels/0b/de/d2/9be5d59d7331c6c2a7c1b6d1a4f463ce107332b1ecd4e80718
Successfully built 

## Task 1
*Question: How many restaurants in NYC were closed right when the city shut down on March 17, 2020, and how many were closed by April 1, 2020?*

The final output of Task 1 solution should be in the following form (with AAA and BBB being your actual computation):
```
The number of restaurants in NYC closed by March 17, 2020: 49
The number of restaurants in NYC closed by April 01, 2020: 496
```

### Data preparation

We will look at records of only 3 different restaurants in NYC. After this, we can treat **`dfPattern`** as the data frame pointing to the NYC restaurant pattern.



In [2]:
RESTAURANTS = set(['sg:2cbad77e421c4ccb8ffd20d2a6b81f78',
                   'sg:d3fdb6458c544bb687d2da3eb1c8e28e',
                   'sg:0e86fc3cfbc1417fab6a5ef1b4a63026'])

dfPattern = spark.read.csv('nyc_restaurant_pattern.csv', 
                           header=True, escape='"') \
    .where(F.col('safegraph_place_id').isin(RESTAURANTS))

dfPattern.show()

+-------------------+--------------------+---------------+-------------------------+--------------------+--------------+--------+------+-----------+----------------+-------------------+------+--------------------+--------------------+----------------+------------------+--------------------+--------------------+------------+--------------------+--------------------+-------------------------+------------------+------------+--------------------+----------------------+-----------------------+--------------------+
|           placekey|  safegraph_place_id|parent_placekey|parent_safegraph_place_id|       location_name|street_address|    city|region|postal_code|iso_country_code|safegraph_brand_ids|brands|    date_range_start|      date_range_end|raw_visit_counts|raw_visitor_counts|       visits_by_day| visits_by_each_hour|     poi_cbg|   visitor_home_cbgs|visitor_daytime_cbgs|visitor_country_of_origin|distance_from_home|median_dwell|bucketed_dwell_times|related_same_day_brand|related_same_wee

### STRATEGY:

Our strategy in using DataFrame would be different than using RDD. To best utilizing the functionalitie of DataFrame, we should with structured data as much as possible. This means that we should keep our data to be in the table whenever we can and working on columns separately. With that in mind, instead of sum up the visits from a date, e.g. March 17, onwards, we could exploding the **`visits_by_day`** column into **`date`** and **`visits`**, then performing our filtering from there.


### A. Keep only `safegraph_place_id`, `date_range_start`, and `visits_by_day` for each record.

**NOTE**: we do not need `date_range_end` in this task since we will be exploding all 7 days of visits starting from `date_range_start`.

In [3]:
dfA = dfPattern.select('safegraph_place_id', 'date_range_start', 'visits_by_day')
dfA.show()

+--------------------+--------------------+--------------------+
|  safegraph_place_id|    date_range_start|       visits_by_day|
+--------------------+--------------------+--------------------+
|sg:d3fdb6458c544b...|2020-03-02T00:00:...|     [3,7,2,3,3,4,3]|
|sg:2cbad77e421c4c...|2020-03-02T00:00:...|     [1,2,0,1,3,0,1]|
|sg:0e86fc3cfbc141...|2020-03-02T00:00:...|[114,112,76,106,9...|
|sg:d3fdb6458c544b...|2020-03-09T00:00:...|     [0,7,5,3,4,2,2]|
|sg:2cbad77e421c4c...|2020-03-09T00:00:...|     [2,0,0,1,1,0,0]|
|sg:0e86fc3cfbc141...|2020-03-09T00:00:...|[101,116,106,100,...|
|sg:d3fdb6458c544b...|2020-03-16T00:00:...|     [3,2,2,1,3,2,0]|
|sg:2cbad77e421c4c...|2020-03-16T00:00:...|     [1,0,0,0,0,0,0]|
|sg:0e86fc3cfbc141...|2020-03-16T00:00:...|[69,74,97,104,71,...|
|sg:d3fdb6458c544b...|2020-03-23T00:00:...|     [1,2,3,2,1,0,0]|
|sg:0e86fc3cfbc141...|2020-03-23T00:00:...|[62,78,73,66,52,3...|
|sg:d3fdb6458c544b...|2020-03-30T00:00:...|     [1,1,0,0,0,0,0]|
|sg:0e86fc3cfbc141...|202

### B. Complete the `expandVisits()` function below to explode the `visits_by_day` column in `dfA` into 7 rows consisting of the date and the visit counts for that date starting from `date_range_start`.

We will name the new columns as `date` and `visits`; and drop the existing `date_range_starts` and `visits_by_day`. In order to do this, we will have to write a User-defined Function (UDF) that takes 2 parameter (`date_range_starts` and `visits_by_date`) and return a **MapType()** of 7 entries with the keys are dates, and the values are visits for those dates.

**NOTE:** we also truncate the datetime string of the visits to only keep the date.

In [4]:
def expandVisits(date_range_start, visits_by_day):
    '''
    This function needs to return a Python's dict{datetime:int} where:
      key   : datetime type, e.g. datetime(2020,3,17), etc.
      value : the number of visits for that day
    '''
    date = date_range_start.split('T')[0].split('-')
    year = int(date[0])
    month = int(date[1])
    day = int(date[2])
    vbd = visits_by_day[1:-1].split(',')
    mydict = dict()

    week_day_count = 0
    for i in vbd:
      mydict[datetime.date(year, month, day)+datetime.timedelta(days=week_day_count)] = int(i)
      week_day_count += 1
    
    return mydict
    

udfExpand = F.udf(expandVisits, MapType(DateType(), IntegerType()))
dfB = dfA.select('safegraph_place_id',
                 F.explode(udfExpand('date_range_start', 'visits_by_day')) \
                    .alias('date', 'visits'))

dfB.show()

+--------------------+----------+------+
|  safegraph_place_id|      date|visits|
+--------------------+----------+------+
|sg:d3fdb6458c544b...|2020-03-02|     3|
|sg:d3fdb6458c544b...|2020-03-03|     7|
|sg:d3fdb6458c544b...|2020-03-04|     2|
|sg:d3fdb6458c544b...|2020-03-05|     3|
|sg:d3fdb6458c544b...|2020-03-06|     3|
|sg:d3fdb6458c544b...|2020-03-07|     4|
|sg:d3fdb6458c544b...|2020-03-08|     3|
|sg:2cbad77e421c4c...|2020-03-02|     1|
|sg:2cbad77e421c4c...|2020-03-03|     2|
|sg:2cbad77e421c4c...|2020-03-04|     0|
|sg:2cbad77e421c4c...|2020-03-05|     1|
|sg:2cbad77e421c4c...|2020-03-06|     3|
|sg:2cbad77e421c4c...|2020-03-07|     0|
|sg:2cbad77e421c4c...|2020-03-08|     1|
|sg:0e86fc3cfbc141...|2020-03-02|   114|
|sg:0e86fc3cfbc141...|2020-03-03|   112|
|sg:0e86fc3cfbc141...|2020-03-04|    76|
|sg:0e86fc3cfbc141...|2020-03-05|   106|
|sg:0e86fc3cfbc141...|2020-03-06|    97|
|sg:0e86fc3cfbc141...|2020-03-07|    59|
+--------------------+----------+------+
only showing top

### C. Next we drop all records in `dfB` with a `date` before March 17.



In [5]:
dfC = dfB.filter(dfB["date"] >= datetime.date(2020,3,17))
dfC.show()

+--------------------+----------+------+
|  safegraph_place_id|      date|visits|
+--------------------+----------+------+
|sg:d3fdb6458c544b...|2020-03-17|     2|
|sg:d3fdb6458c544b...|2020-03-18|     2|
|sg:d3fdb6458c544b...|2020-03-19|     1|
|sg:d3fdb6458c544b...|2020-03-20|     3|
|sg:d3fdb6458c544b...|2020-03-21|     2|
|sg:d3fdb6458c544b...|2020-03-22|     0|
|sg:2cbad77e421c4c...|2020-03-17|     0|
|sg:2cbad77e421c4c...|2020-03-18|     0|
|sg:2cbad77e421c4c...|2020-03-19|     0|
|sg:2cbad77e421c4c...|2020-03-20|     0|
|sg:2cbad77e421c4c...|2020-03-21|     0|
|sg:2cbad77e421c4c...|2020-03-22|     0|
|sg:0e86fc3cfbc141...|2020-03-17|    74|
|sg:0e86fc3cfbc141...|2020-03-18|    97|
|sg:0e86fc3cfbc141...|2020-03-19|   104|
|sg:0e86fc3cfbc141...|2020-03-20|    71|
|sg:0e86fc3cfbc141...|2020-03-21|    37|
|sg:0e86fc3cfbc141...|2020-03-22|    33|
|sg:d3fdb6458c544b...|2020-03-23|     1|
|sg:d3fdb6458c544b...|2020-03-24|     2|
+--------------------+----------+------+
only showing top

### D. Find the max number of visits for each restaurant. This is similar to a word count, and can be done "reduce" all records by restaurant ID with a `max` operator. Name this new column (of the max visits) as `max_visits`.

In [6]:
dfD = dfC.groupBy('safegraph_place_id') \
      .max("visits").withColumnRenamed("max(visits)", "max_visits")
dfD.show()

+--------------------+----------+
|  safegraph_place_id|max_visits|
+--------------------+----------+
|sg:d3fdb6458c544b...|         3|
|sg:2cbad77e421c4c...|         0|
|sg:0e86fc3cfbc141...|       104|
+--------------------+----------+



### E. Finally, we count the number of records in `dfD` that has 0 visits, i.e. the maximum number of visits starting from March 17 is 0. This should be our final answer.

In [7]:
dfE = dfD.filter(dfD.max_visits == 0)
dfE.show()
dfE.count()

+--------------------+----------+
|  safegraph_place_id|max_visits|
+--------------------+----------+
|sg:2cbad77e421c4c...|         0|
+--------------------+----------+



1

### F. Putting it all together, replace `dfPattern` with the full data set and rerun your code from the above steps to get the final answer.

In [8]:
dfPattern = spark.read.csv('nyc_restaurant_pattern.csv', 
                           header=True, escape='"')

dfA = dfPattern.select('safegraph_place_id', 'date_range_start', 'visits_by_day')

def expandVisits(date_range_start, visits_by_day):
    '''
    This function needs to return a Python's dict{datetime:int} where:
      key   : datetime type, e.g. datetime(2020,3,17), etc.
      value : the number of visits for that day
    '''
    date = date_range_start.split('T')[0].split('-')
    year = int(date[0])
    month = int(date[1])
    day = int(date[2])
    vbd = visits_by_day[1:-1].split(',')
    mydict = dict()

    week_day_count = 0
    for i in vbd:
      mydict[datetime.date(year, month, day)+datetime.timedelta(days=week_day_count)] = int(i)
      week_day_count += 1
    
    return mydict
    

udfExpand = F.udf(expandVisits, MapType(DateType(), IntegerType()))
dfB = dfA.select('safegraph_place_id',
                 F.explode(udfExpand('date_range_start', 'visits_by_day')) \
                    .alias('date', 'visits'))

dfC = dfB.filter(dfB["date"] >= datetime.date(2020,3,17))
dfD = dfC.groupBy('safegraph_place_id') \
      .max("visits").withColumnRenamed("max(visits)", "max_visits")
dfE = dfD.filter(dfD.max_visits == 0)
print("The number of restaurants in NYC closed by March 17, 2020:",dfE.count())

The number of restaurants in NYC closed by March 17, 2020: 49


## Task 2
*Question: For those that were open on/after April 1, 2020 in [1], which ones still received a high volume of visits (in any day on/after April 1)? What were the median dwelling time at each establishment in the first week of March (3/2-3/9) and in the first week of April (3/30-4/6)?*

The final output of Task 2 should be a CSV-like format (each establishment per line) **sorted alphabetically by Restaurant_Name**:
```
Restaurant_Name,Street_Address,City,Median_Dwell_Bucket_March,Median_Dwell_Bucket_April
```

Expected output:
```
3 In 1 Kitchen,4902 Fort Hamilton Pkwy,Brooklyn,21-60,21-60
Agape Cafe,655 W 34th St,New York,21-60,21-60
Buffalo Wild Wings,632 Gateway Dr,Brooklyn,21-60,11-20
Burger King,2800 Hylan Blvd,Staten Island,11-20,11-20
Cafe Deli cious,491 1st Ave,New York,21-60,21-60
Cinnabon,1313 Broadway,New York,11-20,11-20
Dunkin',150 B Greaves Laneevergreen Plaza,New York,11-20,21-60
Dunkin',1752 Shore Pkwybjs Wholesale Club,New York,21-60,21-60
Dunkin',2449 Veterans Rd Wshop Rite,New York,21-60,21-60
Dunkin',590 Gateway Dr,Brooklyn,21-60,21-60
Dunkin',625 Atlantic Aveatlantic Center Mall,New York,21-60,21-60
Dunkin',6620 Avenue U,Brooklyn,11-20,11-20
Food Express Truck,2501 Forest Ave Across From Home Depot,New York,11-20,21-60
Golden Krust Caribbean Bakery and Grill,1364 Pennsylvania Ave,Brooklyn,11-20,11-20
Harlem Tavern,2153 Frederick Douglass Blvd,New York,11-20,11-20
Hutong,731 Lexington Ave,New York,21-60,21-60
Khan Express,1275 York Ave,New York,21-60,21-60
King Cab Halal Food,10th Ave & 28th St,New York,N/A,21-60
McDonald's,1403 Mermaid Ave,Brooklyn,5-10,5-10
McDonald's,1600 Bruckner Blvd,Bronx,5-10,11-20
McDonald's,2154 Hylan Blvd,Staten Island,11-20,5-10
McDonald's,260 Page Ave,Staten Island,11-20,5-10
Ninja 86 Sushi,2274 86th St,Brooklyn,11-20,11-20
PROOF Coffee Roasters,335 E 27th St,New York,21-60,21-60
Pizza Gusta,2945 Bruckner Blvd,Bronx,11-20,21-60
Plaza Cafeteria Mount Sinai Hospital,1428 Madison Ave,New York,21-60,21-60
Red Mango,234 W 34th St,New York,11-20,11-20
Roti R Us,1493 Albany Ave,Brooklyn,21-60,21-60
Starbucks,655 W 34th St,New York,11-20,21-60
The Dumplin Shop,3852 Bronxwood Ave,Bronx,21-60,21-60
```

If there is no dwelling time information for a particular week, please report as **`'N/A'`**.

Please note that you can find *Street Address* and *City* information from the *Core Places* data set.

### G. Similar to what we did in (E) but instead of counting those with 0 visits we will filter `dfD` to keep restaurants that have the maximum number of 50 visits or more. Unlike (E), we will keep the following fields for later usage in the output: `location_name`, `street_address`, and `city`.

We can use the same `udfExpand` from (B) but data must be filtered to be from April 1, instead of from March 17. Note that, we cannot reuse `dfA` or `dfB` since those do not include the 3 new columns. We should start from `dfPattern` again.

After this, we can drop the column `max_visits` since it will not be used any more.



In [9]:

dfPattern = spark.read.csv('nyc_restaurant_pattern.csv', 
                           header=True, escape='"') \
    .where(F.col('safegraph_place_id').isin(RESTAURANTS))

dfG = dfPattern.select('safegraph_place_id', 'date_range_start', 'visits_by_day', 'location_name', 'street_address', 'city')


def expandVisits(date_range_start, visits_by_day):
    '''
    This function needs to return a Python's dict{datetime:int} where:
      key   : datetime type, e.g. datetime(2020,3,17), etc.
      value : the number of visits for that day
    '''
    date = date_range_start.split('T')[0].split('-')
    year = int(date[0])
    month = int(date[1])
    day = int(date[2])
    vbd = visits_by_day[1:-1].split(',')
    mydict = dict()

    week_day_count = 0
    for i in vbd:
      mydict[datetime.date(year, month, day)+datetime.timedelta(days=week_day_count)] = int(i)
      week_day_count += 1
    
    return mydict
    

udfExpand = F.udf(expandVisits, MapType(DateType(), IntegerType()))
dfG = dfG.select('safegraph_place_id',
                 F.explode(udfExpand('date_range_start', 'visits_by_day')) \
                    .alias('date', 'visits'), \
                 'location_name', 'street_address', 'city')

dfG = dfG.filter(dfG["date"] >= datetime.date(2020,4,1))

dfG = dfG.groupBy('safegraph_place_id').agg({'visits':'max', 'location_name':'first', 'street_address':'first', 'city':'first'}) \
        .withColumnRenamed('max(visits)', 'max_visits') \
        .withColumnRenamed('first(location_name)', 'location_name') \
        .withColumnRenamed('first(street_address)', 'street_address') \
        .withColumnRenamed('first(city)', 'city')

      
dfG = dfG.filter(dfG.max_visits >= 50)
dfG.show()

dfG = dfG.drop('max_visits')
dfG.show()

+--------------------+----------+--------------+---------------+--------+
|  safegraph_place_id|max_visits|street_address|  location_name|    city|
+--------------------+----------+--------------+---------------+--------+
|sg:0e86fc3cfbc141...|        68|   491 1st Ave|Cafe Deli cious|New York|
+--------------------+----------+--------------+---------------+--------+

+--------------------+--------------+---------------+--------+
|  safegraph_place_id|street_address|  location_name|    city|
+--------------------+--------------+---------------+--------+
|sg:0e86fc3cfbc141...|   491 1st Ave|Cafe Deli cious|New York|
+--------------------+--------------+---------------+--------+



### H. On the other hand, we also need to filter the pattern data set to keep only records for the first week of March, and April, so that we can compute the median dwelling time.

Complete the transformation below to include `safegraph_place_id`, `date`, and `bucketed_dwell_times` columns, where `date` is derived from `date_range_start` to keep only the date if it is either `'2020-03-02'` or `'2020-03-30'`.

In [10]:
dfH = dfPattern.select('safegraph_place_id', 'date_range_start', 'bucketed_dwell_times')
dfH = dfH.withColumn("date_range_start", F.split(dfH.date_range_start, 'T').getItem(0)).withColumnRenamed('date_range_start', 'date')
dfH = dfH.filter((dfH.date == '2020-03-02') | (dfH.date == '2020-03-30'))
dfH = dfH.withColumn("date", F.to_date(dfH.date, 'yyyy-MM-dd'))

dfH.show()

+--------------------+----------+--------------------+
|  safegraph_place_id|      date|bucketed_dwell_times|
+--------------------+----------+--------------------+
|sg:d3fdb6458c544b...|2020-03-02|{"<5":2,"5-10":5,...|
|sg:2cbad77e421c4c...|2020-03-02|{"<5":1,"5-10":1,...|
|sg:0e86fc3cfbc141...|2020-03-02|{"<5":17,"5-10":1...|
|sg:d3fdb6458c544b...|2020-03-30|{"<5":0,"5-10":1,...|
|sg:0e86fc3cfbc141...|2020-03-30|{"<5":11,"5-10":7...|
+--------------------+----------+--------------------+



### I. Complete the `medianDwellBucket` function below which takes a `bucketed_dwell_time` string similar to those in `dfH`, and return the median bucket label after removing `">240"` bucket.

In [11]:
testI1 = '{"<5":11,"5-10":70,"11-20":68,"21-60":65,"61-120":36,"121-240":37,">240":101}'
testI2 = '{"<5":1,"5-10":2,"11-20":1,"21-60":3,"61-120":1,"121-240":0,">240":1}'

def medianDwellBucket(dwells):
  dwells = json.loads(dwells)
  del dwells['>240']
  total = sum(dwells.values())
  middle = total/2
  tmp = 0
  for key in dwells:
    tmp = tmp + dwells[key]

    if tmp > middle:
      return key


print(medianDwellBucket(testI1)) # should be '11-20'
print(medianDwellBucket(testI2)) # should be '21-60'

11-20
21-60


### J. Make use of the `medianDwellBucket()` function above, we can transform the `bucketed_median_dwell` in `dfH` to the median bucket label, named `median_dwell`.

In [12]:
udfMedian = F.udf(lambda x: medianDwellBucket(x), StringType())

dfJ = dfH.withColumn('median_dwell', udfMedian('bucketed_dwell_times')) \
    .select('safegraph_place_id', 'date', 'median_dwell')
dfJ.show()

+--------------------+----------+------------+
|  safegraph_place_id|      date|median_dwell|
+--------------------+----------+------------+
|sg:d3fdb6458c544b...|2020-03-02|       21-60|
|sg:2cbad77e421c4c...|2020-03-02|       21-60|
|sg:0e86fc3cfbc141...|2020-03-02|       21-60|
|sg:d3fdb6458c544b...|2020-03-30|       21-60|
|sg:0e86fc3cfbc141...|2020-03-30|       21-60|
+--------------------+----------+------------+



### K. We are going to join `dfJ` with `dfG` (the list of restaurants with at least 50 visits a day).

For each restaurant, we should mostly have two records, one for the week of `'2020-03-02'`, and another for the week of `'2020-03-30'`.

In [13]:
dfK = dfJ.join(dfG, 'safegraph_place_id')
dfK.show()

+--------------------+----------+------------+--------------+---------------+--------+
|  safegraph_place_id|      date|median_dwell|street_address|  location_name|    city|
+--------------------+----------+------------+--------------+---------------+--------+
|sg:0e86fc3cfbc141...|2020-03-30|       21-60|   491 1st Ave|Cafe Deli cious|New York|
|sg:0e86fc3cfbc141...|2020-03-02|       21-60|   491 1st Ave|Cafe Deli cious|New York|
+--------------------+----------+------------+--------------+---------------+--------+



### L. Next, we need to collapse the two week data for each restaurant into a map (keys are for the week label).

This can be done by "grouping" records by `safegraph_place_id`, then for each group: turn a list of `median_dwell` into a map using `map_from_arrays` while keeping any value in the other columns.



In [14]:
dfL = dfK.groupBy('safegraph_place_id') \
    .agg(F.first('location_name').alias('location_name'), 
         F.first('street_address').alias('street_address'),
         F.first('city').alias('city'), 
         F.map_from_arrays(F.collect_list('date'), 
                           F.collect_list('median_dwell')).alias('dwells'))

dfL.show()

+--------------------+---------------+--------------+--------+--------------------+
|  safegraph_place_id|  location_name|street_address|    city|              dwells|
+--------------------+---------------+--------------+--------+--------------------+
|sg:0e86fc3cfbc141...|Cafe Deli cious|   491 1st Ave|New York|{2020-03-30 -> 21...|
+--------------------+---------------+--------------+--------+--------------------+



### M. Before we get the expected output, we have to expand the `dwells` column into two columns, one for each month. Then, we can safely drop the two columns `safegraph_place_id` and `dwells`, and replace any N/A value with the string 'N/A'.

In [15]:
dfM = dfL \
    .withColumn('March', dfL.dwells[datetime.date(2020,3,2)]) \
    .withColumn('April', dfL.dwells[datetime.date(2020,3,30)]) \
    .drop('safegraph_place_id', 'dwells') \
    .fillna('N/A', ['March', 'April'])
dfM.show()

+---------------+--------------+--------+-----+-----+
|  location_name|street_address|    city|March|April|
+---------------+--------------+--------+-----+-----+
|Cafe Deli cious|   491 1st Ave|New York|21-60|21-60|
+---------------+--------------+--------+-----+-----+



### N. Finally, we can join all the columns and prepare for our output.

In [16]:
print('\n'.join(sorted(dfM.rdd.map(lambda x: ','.join(x)).collect())))

Cafe Deli cious,491 1st Ave,New York,21-60,21-60


### O. Putting it all together, replace `dfPattern` with the full data set and rerun your code from the above steps to get the final answer.

In [17]:
dfPattern = spark.read.csv('nyc_restaurant_pattern.csv', 
                           header=True, escape='"')

dfG = dfPattern.select('safegraph_place_id', 'date_range_start', 'visits_by_day', 'location_name', 'street_address', 'city')


def expandVisits(date_range_start, visits_by_day):
    '''
    This function needs to return a Python's dict{datetime:int} where:
      key   : datetime type, e.g. datetime(2020,3,17), etc.
      value : the number of visits for that day
    '''
    date = date_range_start.split('T')[0].split('-')
    year = int(date[0])
    month = int(date[1])
    day = int(date[2])
    vbd = visits_by_day[1:-1].split(',')
    mydict = dict()

    week_day_count = 0
    for i in vbd:
      mydict[datetime.date(year, month, day)+datetime.timedelta(days=week_day_count)] = int(i)
      week_day_count += 1
    
    return mydict
    

udfExpand = F.udf(expandVisits, MapType(DateType(), IntegerType()))
dfG = dfG.select('safegraph_place_id',
                 F.explode(udfExpand('date_range_start', 'visits_by_day')) \
                    .alias('date', 'visits'), \
                 'location_name', 'street_address', 'city')

dfG = dfG.filter(dfG["date"] >= datetime.date(2020,4,1))

dfG = dfG.groupBy('safegraph_place_id').agg({'visits':'max', 'location_name':'first', 'street_address':'first', 'city':'first'}) \
        .withColumnRenamed('max(visits)', 'max_visits') \
        .withColumnRenamed('first(location_name)', 'location_name') \
        .withColumnRenamed('first(street_address)', 'street_address') \
        .withColumnRenamed('first(city)', 'city')

      
dfG = dfG.filter(dfG.max_visits >= 50)

dfG = dfG.drop('max_visits')


dfH = dfPattern.select('safegraph_place_id', 'date_range_start', 'bucketed_dwell_times')
dfH = dfH.withColumn("date_range_start", F.split(dfH.date_range_start, 'T').getItem(0)).withColumnRenamed('date_range_start', 'date')
dfH = dfH.filter((dfH.date == '2020-03-02') | (dfH.date == '2020-03-30'))
dfH = dfH.withColumn("date", F.to_date(dfH.date, 'yyyy-MM-dd'))

def medianDwellBucket(dwells):
  dwells = json.loads(dwells)
  del dwells['>240']
  total = sum(dwells.values())
  middle = total/2
  tmp = 0
  for key in dwells:
    tmp = tmp + dwells[key]

    if tmp > middle:
      return key

udfMedian = F.udf(lambda x: medianDwellBucket(x), StringType())

dfJ = dfH.withColumn('median_dwell', udfMedian('bucketed_dwell_times')) \
    .select('safegraph_place_id', 'date', 'median_dwell')

dfK = dfJ.join(dfG, 'safegraph_place_id')
dfL = dfK.groupBy('safegraph_place_id') \
    .agg(F.first('location_name').alias('location_name'), 
         F.first('street_address').alias('street_address'),
         F.first('city').alias('city'), 
         F.map_from_arrays(F.collect_list('date'), 
                           F.collect_list('median_dwell')).alias('dwells'))
    
dfM = dfL \
    .withColumn('March', dfL.dwells[datetime.date(2020,3,2)]) \
    .withColumn('April', dfL.dwells[datetime.date(2020,3,30)]) \
    .drop('safegraph_place_id', 'dwells') \
    .fillna('N/A', ['March', 'April'])


print('\n'.join(sorted(dfM.rdd.map(lambda x: ','.join(x)).collect())))

3 In 1 Kitchen,4902 Fort Hamilton Pkwy,Brooklyn,21-60,21-60
Agape Cafe,655 W 34th St,New York,21-60,21-60
Buffalo Wild Wings,632 Gateway Dr,Brooklyn,21-60,11-20
Burger King,2800 Hylan Blvd,Staten Island,11-20,11-20
Cafe Deli cious,491 1st Ave,New York,21-60,21-60
Cinnabon,1313 Broadway,New York,11-20,11-20
Dunkin',150 B Greaves Laneevergreen Plaza,New York,11-20,21-60
Dunkin',1752 Shore Pkwybjs Wholesale Club,New York,21-60,21-60
Dunkin',2449 Veterans Rd Wshop Rite,New York,21-60,21-60
Dunkin',590 Gateway Dr,Brooklyn,21-60,21-60
Dunkin',625 Atlantic Aveatlantic Center Mall,New York,21-60,21-60
Dunkin',6620 Avenue U,Brooklyn,11-20,11-20
Food Express Truck,2501 Forest Ave Across From Home Depot,New York,11-20,21-60
Golden Krust Caribbean Bakery and Grill,1364 Pennsylvania Ave,Brooklyn,11-20,11-20
Harlem Tavern,2153 Frederick Douglass Blvd,New York,11-20,11-20
Hutong,731 Lexington Ave,New York,21-60,21-60
Khan Express,1275 York Ave,New York,21-60,21-60
King Cab Halal Food,10th Ave & 28th 