In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window

poi = spark.table("safegraph_spend.poi").dropDuplicates(subset=["PLACEKEY"])
mapping = spark.table("safegraph.censusmapping").withColumnRenamed("placekey", "PLACEKEY").select("PLACEKEY", "tract").dropDuplicates(subset=["PLACEKEY"])
placekey = spark.table("passby.placekey")
visitors = spark.table("passby.visitors")

# Simple feature - number of schools in a tract

In [0]:
schools = poi \
    .join(mapping, on="PLACEKEY", how="inner") \
    .groupBy('tract') \
    .agg(count(when(col("NAICS_CODE") == 611110, 1)).alias("tract_school_count")) \
    .select('tract', 'tract_school_count') \
    .fillna({"tract_school_count": 0})

# Complex feature - most common age group in a tract

In [0]:
window_spec = Window.partitionBy('tract').orderBy(col('weight').desc())

ages = visitors \
    .select('STORE_ID', explode(visitors.VISITOR_AGE_RANGE).alias('mode_age', 'weight'))\
    .join(placekey, 'STORE_ID').join(mapping, 'PLACEKEY') \
    .groupBy('tract', 'mode_age').agg(sum('weight').alias('weight')) \
    .withColumn('rank', row_number().over(window_spec)) \
    .filter(col('rank') == 1) \
    .select('tract', 'mode_age')

In [0]:
sam_features = schools.join(ages, 'tract')
display(sam_features)

tract,tract_school_count,mode_age
1003011502,3,50_54
1013953300,0,50_54
1015001600,3,50_54
1015002200,0,50_54
1017953800,1,50_54
1023956900,3,50_54
1033020100,3,50_54
1033020704,1,50_54
1043964400,2,50_54
1055010700,1,50_54
