In [None]:
from gql import gql, Client
from gql.transport.requests import RequestsHTTPTransport
import pyspark.sql.functions as F
from pyspark.sql.types import (ArrayType, LongType, StringType, StructField, StructType, DoubleType, MapType)
import json
import jsonlines
import pandas as pd
import os

In [None]:
url = 'https://api.safegraph.com/v2/graphql'

In [None]:
%run ./keys

In [None]:
# Initiate API connection
transport = RequestsHTTPTransport(
    url=url,
    verify=True,
    retries=3,
    headers={'Content-Type': 'application/json', 'apikey': sfkey})
client = Client(transport=transport, fetch_schema_from_transport=True)

In [None]:
query_sg = """query {
  search(filter: { 
     --FILTERS--
    address: {
      region: "--STATENAME--"
    }
  }){
    places {
      results(first: 500 after: "--ENDCURSER--") {
        pageInfo { hasNextPage, endCursor}
        edges {
          node {
            monthly_patterns (start_date: "--DATESTART--" end_date: "--DATEEND--") {
              placekey
              parent_placekey
              location_name
              street_address
              city
              region
              postal_code
              iso_country_code
              date_range_start
              date_range_end
              raw_visit_counts
              raw_visitor_counts
              visits_by_day
              device_type
              poi_cbg
              visitor_home_cbgs
              visitor_home_aggregation
              visitor_daytime_cbgs
              visitor_country_of_origin
              distance_from_home
              median_dwell
              bucketed_dwell_times
              related_same_day_brand
              related_same_month_brand
              normalized_visits_by_total_visits
              normalized_visits_by_state_scaling
              normalized_visits_by_total_visitors
              normalized_visits_by_region_naics_visits
              normalized_visits_by_region_naics_visitors
            }
          }
        }
      }
    }
  }
}
"""

In [None]:
os.mkdir('/tmp/api_challenge')
dbutils.fs.mkdirs("/FileStore/api_challenge")

In [None]:
nextPaging = ''
while True: 
    query_sg_text = query_sg\
      .replace("--STATENAME--", "RI")\
      .replace("--FILTERS--", '')\
      .replace("--DATESTART--", "2022-01-01")\
      .replace("--DATEEND--", "2022-04-01")\
      .replace("--ENDCURSER--", nextPaging)
    sgIter = client.execute(gql(query_sg_text))
    pageInformation = sgIter['search']['places']['results']['pageInfo']
    nextPaging = pageInformation['endCursor']
    edgesIter = sgIter['search']['places']['results']['edges']
    sgIter = [dat.pop('node') for dat in edgesIter]
    sgIter = [dat.pop('monthly_patterns') for dat in sgIter]
    with jsonlines.open("/tmp/api_challenge/api_2.jl", 'a') as writer:
        writer.write_all(sgIter)
        writer.close()
    if nextPaging is None:
        break

In [None]:
print(os.listdir("/tmp/api_challenge"))
dbutils.fs.ls("File:/tmp/api_challenge")

In [None]:
dbutils.fs.cp("File:/tmp/api_challenge/api_2.jl", "dbfs:/FileStore/api_challenge/api_2.jl")
dbutils.fs.cp("File:/tmp/api_challenge/", "dbfs:/FileStore/api_challenge/", recurse = True)
dbutils.fs.ls("/FileStore/api_challenge")

In [None]:
dfsg = spark.read.json("dbfs:/FileStore/api_challenge/api_2.jl", schema = schema)
dfsg = spark.read.json("dbfs:/FileStore/api_challenge", schema = schema)

In [None]:
%sql
CREATE DATABASE api1

In [None]:
dfsg = dfsg.na.drop(subset=["placekey"])
dfsg = dfsg.persist()
print("Count with all json files: " + str(dfsg.count()))

dfsg = dfsg.dropDuplicates(['placekey', 'date_range_start'])
dfsg.persist()
print("Count after duplicates: " + str(dfsg.count()))

dfsg.unpersist()

print("done")

In [None]:
dfsg.repartition(10).write.format("delta").saveAsTable("api1.test_table")

In [None]:
df = spark.sql("SELECT * FROM api1.test_table")
df = spark.table("api1.test_table")