# SafeGraph foot traffic data processing (batch version)
Outputs foot_traffic_time_series.csv, time series data of number of daily visits of each place
Also information about each place in places_info.csv

Requires Spark and pandas

SafeGraph information about their data:
https://docs.google.com/spreadsheets/u/1/d/1UNWvPzkUTTlXBZ6M6iGhM_7sr8h-MxsZdE7iOszkAmk/htmlview#

Open latest weeks file and filter out zip codes according to Zip_Codes__LA_County_.txt

In [39]:
from pyspark.sql import SparkSession
import os
import pandas as pd
import time

ss = SparkSession \
    .builder \
    .appName("Foot_Traffic") \
    .config("spark.master", "local[12]") \
    .getOrCreate()

start_time = time.time()
file = "2020-05-11-weekly-patterns.csv"
main_df = ss.read.csv(file, header=True)

with open("Zip_Codes__LA_County_.txt") as file:
    zip_codes = file.read().splitlines()
main_df = main_df.filter(main_df.postal_code.isin(zip_codes))
main_df.show()

+--------------------+--------------------+--------------------+-----------------+------+-----------+----------------+--------------------+---------------+--------------------+--------------------+----------------+------------------+--------------------+--------------------+------------+--------------------+--------------------+-------------------------+--------------------+--------------------+--------------------+----------------------+-----------------------+--------------------+
|  safegraph_place_id|       location_name|      street_address|             city|region|postal_code|iso_country_code| safegraph_brand_ids|         brands|    date_range_start|      date_range_end|raw_visit_counts|raw_visitor_counts|       visits_by_day| visits_by_each_hour|     poi_cbg|   visitor_home_cbgs|visitor_daytime_cbgs|visitor_country_of_origin|  distance_from_home|        median_dwell|bucketed_dwell_times|related_same_day_brand|related_same_week_brand|         device_type|
+--------------------+--

Selecting the columns of interest

In [40]:
main_df = main_df.select('safegraph_place_id', 'location_name', 'street_address', 'city', 'postal_code',
                         'safegraph_brand_ids', 'brands',
                         'raw_visit_counts', 'raw_visitor_counts', 'visits_by_day',
                         'median_dwell')
this_week_df = main_df.select('safegraph_place_id', 'visits_by_day')
this_week_df.show()

+--------------------+--------------------+
|  safegraph_place_id|       visits_by_day|
+--------------------+--------------------+
|sg:0327ad5802114d...| [11,13,10,11,8,1,2]|
|sg:03eaa2b5b78646...|    [2,2,5,4,5,11,5]|
|sg:043e398a8ff240...|     [0,4,4,2,3,2,6]|
|sg:0502c14eeaac4a...|     [0,1,1,0,0,0,0]|
|sg:05e114ea81c04f...|[17,11,16,14,10,1...|
|sg:06420245d43f4b...|     [1,2,0,1,0,2,1]|
|sg:088bb50e99ae44...|    [1,8,11,5,6,9,1]|
|sg:08e842e0a28642...|[466,517,508,537,...|
|sg:09503ad6209a48...|    [6,7,8,8,8,23,9]|
|sg:0aef1772c90b45...|     [0,0,1,0,1,0,0]|
|sg:0af5cecfb08847...|     [0,3,3,4,3,2,1]|
|sg:0b57167bb68146...|     [2,2,2,1,2,0,0]|
|sg:0c8a94a3dc0a4b...|     [3,1,1,4,2,0,2]|
|sg:0d39eba90d794a...|     [3,1,4,2,1,3,0]|
|sg:0ee32c6bd1444f...|     [2,4,0,1,1,0,1]|
|sg:0f41c833a6e244...|     [1,0,0,0,0,0,1]|
|sg:10100a54039343...|     [2,0,0,0,0,0,0]|
|sg:114c164314b04b...|     [2,1,0,2,4,2,4]|
|sg:12847410edfd46...|     [4,7,4,3,4,0,0]|
|sg:137e41754b6944...|     [1,0,

Drop visits_by_day from main_df and output csv

In [41]:
main_df = main_df.select('safegraph_place_id', 'location_name', 'street_address', 'city', 'postal_code',
                         'raw_visit_counts', 'raw_visitor_counts', 'median_dwell')
main_df_pd = main_df.toPandas()
main_df_pd.to_csv(os.path.join("output", "places_info.csv"), index=False)

Method to Join last weeks data to current. Loop this for every weekly file

In [42]:
def join_last_week_data(last_week_file, this_week_df, zip_codes):
    last_week_df = ss.read.csv(last_week_file, header=True)
    last_week_df = last_week_df.filter(last_week_df.postal_code.isin(zip_codes)) \
        .select('safegraph_place_id', 'visits_by_day')
    this_week_df = this_week_df.withColumnRenamed("visits_by_day", "visits_by_day_current")
    this_week_df = this_week_df.join(last_week_df, how='inner', on='safegraph_place_id')
    this_week_df = this_week_df.rdd \
    .map(lambda x: (x['safegraph_place_id'], (x['visits_by_day'] + x['visits_by_day_current']).replace("][", ","))) \
    .toDF(["safegraph_place_id", "visits_by_day_current"])
    return this_week_df

In [43]:
last_week_file_list = ["2020-05-04-weekly-patterns.csv", "2020-04-27-weekly-patterns.csv", "2020-04-20-weekly-patterns.csv"]
for last_week_file in last_week_file_list:
    this_week_df = join_last_week_data(last_week_file, this_week_df, zip_codes)
this_week_df.show()

+--------------------+---------------------+
|  safegraph_place_id|visits_by_day_current|
+--------------------+---------------------+
|sg:0021c44e821f44...| [0,2,1,0,2,0,4,0,...|
|sg:004b49c8b2284f...| [12,11,10,15,13,1...|
|sg:01088372d88548...| [4,1,3,3,5,3,0,6,...|
|sg:01be247ce34947...| [6,7,4,4,4,8,6,2,...|
|sg:0330311d595045...| [1,0,0,0,0,0,0,1,...|
|sg:03ab170d4e5d41...| [3,0,2,6,2,4,3,6,...|
|sg:045d96f96f6f47...| [2,2,7,2,8,3,2,4,...|
|sg:0478e10e2fbb48...| [0,4,4,1,2,2,5,2,...|
|sg:0557a1065b9047...| [0,0,1,0,0,0,2,0,...|
|sg:05c00ab60e2444...| [2,3,2,3,3,9,2,3,...|
|sg:060f291360734a...| [2,1,4,2,1,2,1,4,...|
|sg:08c7eebee07b45...| [1,2,2,1,4,0,0,2,...|
|sg:08efd9dac9cf42...| [3,0,1,1,2,1,0,0,...|
|sg:090cf1c0f62e41...| [2,4,5,2,1,0,0,5,...|
|sg:0a6571a06d7446...| [6,9,6,6,7,9,0,10...|
|sg:0afb6633a4224e...| [1,2,0,0,0,0,0,0,...|
|sg:0b448c58611749...| [4,3,5,4,6,7,15,5...|
|sg:0c0b6d5f828a48...| [1,5,2,7,2,1,3,2,...|
|sg:0c4e86749cd244...| [3,1,7,4,3,5,3,9,...|
|sg:0c66f4

Convert spark df to pandas, remove brackets and split columns by comma

In [44]:
traffic_df = this_week_df.toPandas()

In [45]:
temp = traffic_df.visits_by_day_current.str.replace("[\[\]]", "", regex=True).str.split(",", expand=True)
traffic_df1 = pd.concat([traffic_df.safegraph_place_id, temp], axis=1)
traffic_df1 = traffic_df1.melt(id_vars="safegraph_place_id",
        var_name="Day",
        value_name="Visits")
traffic_df1

Unnamed: 0,safegraph_place_id,Day,Visits
0,sg:0021c44e821f44a2a33e8ebff6dfd951,0,0
1,sg:004b49c8b2284f6494dab410fbc60508,0,12
2,sg:01088372d8854814a713acef83c6e6b7,0,4
3,sg:01be247ce34947dfa938f3fe9a8bf98d,0,6
4,sg:0330311d595045eab8c25f1215f76970,0,1
...,...,...,...
2751219,sg:fc8eea50d26f4dbdb5da6de5718d9e4d,27,1
2751220,sg:fd2e05099ca544839156a4d979177192,27,2
2751221,sg:fd5e17a3f62b4f0485e1958116a847c6,27,1
2751222,sg:fd785b0ceb1d4a6d92abe0438100f952,27,0


In [46]:
traffic_df1.to_csv(os.path.join("output", "foot_traffic_time_series.csv"), index=False)

Total Running time (s)

In [47]:
time.time() - start_time

253.51200008392334