# First PYSpark DataFrame Creation

In this file we 
* Create a SparkSession object
* Download a csv file from the web
* Read the csv as a PySpark DataFrame
* View the top 20 rows of the PySpark DataFrame
* Write the PySpark DF as a parquet file to a folder zones (with default partition 1)

In [1]:
#NOTE: 
# Run the prefect_gcp_block.py and register the block
# prefect block register --file prefect_gcp_block.py

# spark related packages
import pyspark
from pyspark.sql import SparkSession

In [2]:
#other utility packages
from decouple import config, AutoConfig
import os
import requests
from datetime import datetime, timedelta
from pathlib import Path
from datetime import datetime, timedelta

In [None]:
# prefect related packages
from prefect import flow, task
from prefect_gcp.cloud_storage import GcsBucket


In [None]:
# Credentials
config = AutoConfig(search_path='.env')
GCS_BUCKET_BLOCK = config("GCS_BUCKET_BLOCK")


In [1]:
def download_data(dataset_url:str, data_folder:str, filename_csv:str): -> Path
    output_dir = Path(data_folder)
    output_dir.mkdir(parents=True, exist_ok=True)
    filename = f'{datetime.strptime(datetime.today(), "%Y-%m-%d")}_{filename_csv}'
    filepath_local = Path(f'{data_folder}/{filename}')
    # download the csv
    os.system(f"wget {dataset_url} -O {filepath_local}")
    return filepath_local

In [None]:
@task(retries=3)
def write_to_gcs(path:Path) -> None:
    """Loads the dataset into GCS"""
    gcs_block = GcsBucket.load(GCS_BUCKET_BLOCK)
    gcs_block.upload_from_path(from_path=path, to_path=Path(path).as_posix()) # To handle the backslash that is being changed when writing to GCS
    return 

In [None]:
def read_data(data_path:Path)->None:
    # Create PySpark SparkSession
    spark = SparkSession.builder.master("local[*]").appName('test').getOrCreate()
    
    

In [11]:
os.chdir('/home/sanyashireen/sf_eviction')
os.getcwd()

'/home/sanyashireen/sf_eviction'

In [29]:
filename = '5cei-gny5'
data_url = f"https://data.sfgov.org/resource/{filename}.csv"
filename_csv ='eviction.csv'
data_dir = Path("data_eviction")


download_data(data_url, data_dir, filename_csv)

## Downloading csv data

In [23]:
data_loc_csv

'/home/sanyashireen/sf_eviction/data/eviction_data.csv'

In [28]:
!wget https://data.sfgov.org/resource/5cei-gny5.csv -P '/home/sanyashireen/sf_eviction/data/'

--2023-03-08 19:02:41--  https://data.sfgov.org/resource/5cei-gny5.csv
Resolving data.sfgov.org (data.sfgov.org)... 52.206.140.199, 52.206.140.205, 52.206.68.26
Connecting to data.sfgov.org (data.sfgov.org)|52.206.140.199|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: unspecified [text/csv]
Saving to: ‘data_dir/5cei-gny5.csv’

5cei-gny5.csv           [ <=>                ] 360.33K  --.-KB/s    in 0.09s   

2023-03-08 19:02:43 (4.08 MB/s) - ‘data_dir/5cei-gny5.csv’ saved [368982]



In [32]:
!wc -l data/5cei-gny5.csv

2195 data/5cei-gny5.csv


In [33]:
# Reading the CSV as a PySpark DF
df_csv = spark.read.option("header", "true").csv(f'{data_loc_csv}')

In [35]:
df_csv.show(5)

+-----------+--------------------+-------------+-----+-----+--------------------+-----------+------+--------+-----------+-----------------------+-------------+--------------------+-------------+----------+-------------------+-----------------+--------------------+----------------+------------------+-----------+-------------+----------------+-----------+-------------------+----------------+-------------------+--------------------+---------------+--------------------+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+
|eviction_id|             address|         city|state|  zip|           file_date|non_payment|breach|nuisance|

## Downloading json data

In [31]:
# Download the file from the web into the data folder
!wget https://data.sfgov.org/resource/5cei-gny5.json -P '/home/sanyashireen/sf_eviction/data/'

--2023-03-08 19:10:18--  https://data.sfgov.org/resource/5cei-gny5.json
Resolving data.sfgov.org (data.sfgov.org)... 52.206.140.205, 52.206.68.26, 52.206.140.199
Connecting to data.sfgov.org (data.sfgov.org)|52.206.140.205|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: unspecified [application/json]
Saving to: ‘/home/sanyashireen/sf_eviction/data/5cei-gny5.json’

5cei-gny5.json          [ <=>                ]   1.02M  6.79MB/s    in 0.2s    

2023-03-08 19:10:19 (6.79 MB/s) - ‘/home/sanyashireen/sf_eviction/data/5cei-gny5.json’ saved [1069481]



In [17]:
# Read the csv as a PySpark DF object
df = spark.read.json(data_loc_json, multiLine=True)

In [20]:
type(df)

pyspark.sql.dataframe.DataFrame

In [18]:
df.printSchema()

root
 |-- :@computed_region_26cr_cadq: string (nullable = true)
 |-- :@computed_region_6ezc_tdp2: string (nullable = true)
 |-- :@computed_region_6pnf_4xz7: string (nullable = true)
 |-- :@computed_region_6qbp_sg9q: string (nullable = true)
 |-- :@computed_region_9jxd_iqea: string (nullable = true)
 |-- :@computed_region_ajp5_b2md: string (nullable = true)
 |-- :@computed_region_bh8s_q3mv: string (nullable = true)
 |-- :@computed_region_fyvs_ahh9: string (nullable = true)
 |-- :@computed_region_h4ep_8xdi: string (nullable = true)
 |-- :@computed_region_jwn9_ihcz: string (nullable = true)
 |-- :@computed_region_p5aj_wyqh: string (nullable = true)
 |-- :@computed_region_pigm_ib2e: string (nullable = true)
 |-- :@computed_region_qgnn_b9vv: string (nullable = true)
 |-- :@computed_region_rxqg_mtj9: string (nullable = true)
 |-- :@computed_region_yftq_j783: string (nullable = true)
 |-- access_denial: boolean (nullable = true)
 |-- address: string (nullable = true)
 |-- breach: boolean (nul

In [19]:
df.show(5)

23/03/08 18:51:12 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+-------------+--------------------+------+-------------------+-------------+---------------+----------------+----------------+----------+-----------+--------------------+-----------+-----------------------+--------------------+-------------------+-----------+-------------+----------------+--------------------+-----------+--------+-----------+-------------+------------------+--------------------+-

In [7]:
# View the top 20 rows of the PySpark DF
df.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [8]:
# Use Spark and write the PySpark DF to the folder 'zones' as a parquet file where it will be written as partitons
# if number of partitions is not defined the default paritions in 1
df.write.parquet('zones')

                                                                                

In [11]:
# We can see the folder zones was created to write the parquet file into
!ls -lh

total 28K
-rw-rw-r-- 1 sanyashireen sanyashireen 6.8K Feb 22 01:52 Untitled.ipynb
-rw-rw-r-- 1 sanyashireen sanyashireen  13K Aug 17  2016 taxi+_zone_lookup.csv
drwxr-xr-x 2 sanyashireen sanyashireen 4.0K Feb 22 01:54 zones


## Downloading data using the API

In [2]:
config = AutoConfig(search_path='.env') # <-- .env file located next to manage.py
API_TOKEN = config("API_TOKEN")
API_KEY_ID = config("API_KEY_ID")
API_KEY_SECRET = config("API_KEY_SECRET")

In [6]:
def parse_metadata(header):
    """Parses metadata from API response."""
    try:
        metadata = {
            'api-call-date': header['Date'],
            'content-type': header['Content-Type'],
            'source-last-modified': header['X-SODA2-Truth-Last-Modified'],
            'fields': header['X-SODA2-Fields'],
            'types': header['X-SODA2-Types']
        }
    except KeyError:
        metadata = {'KeyError': 'Metadata missing from header, see error log.'}
    return metadata        

In [17]:
# download the json by supplying the api token in the header
def get_json(endpoint, headers):
    """Calls API, requests all created & updated records >/= 180 days."""
    headers['Accept'] = 'application/json' # csv?
    pull_date = (datetime.now() - timedelta(days=180)).strftime("%Y-%m-%dT%H:%M:%S") # year, month, day, hour, minute, seconds, microseconds
    combined = []
    offset, counter = 0, 1
    error = False
    '''
    while True:
        params = f"""$query=SELECT:*,* WHERE :created_at >= '{pull_date}' OR :updated_at >= '{pull_date}' 
                 ORDER BY :id LIMIT 177600"""
        response = requests.get(endpoint, headers=headers, params=params)
        if response.status_code != 200:
            error = f'api_request-endpoint|{endpoint}|params|{params}|'
            break
        captured = response.json()
        if len(captured) == 0:
            break
        combined.extend(captured)
        offset = 10000 * counter
        counter += 1
    if error:
        log_exit(filename=error, api_error=response.status_code)
        return -1, -1
    '''
    params = f"""$query=SELECT:*,* WHERE :created_at >= '{pull_date}' OR :updated_at >= '{pull_date}' ORDER BY :id LIMIT 177600"""
    # response has two parts .json() and .headers https://www.w3schools.com/python/ref_requests_response.asp
    response = requests.get(endpoint, headers=headers, params=params)
    captured = response.json()
    combined.extend(captured)

    metadata = parse_metadata(response.headers)
    print('get_json complete')
    return metadata, combined

In [18]:
SODA_url = 'https://data.sfgov.org/resource/5cei-gny5'
SODA_headers = {
    'keyId': API_KEY_ID,
    'keySecret': API_KEY_SECRET
}
head, content = get_json(SODA_url, SODA_headers)

get_json complete


In [9]:
type(head)

dict

In [15]:
type(content)

list

In [19]:
len(content)

177578

In [11]:
print(head)

{'api-call-date': 'Thu, 09 Mar 2023 14:39:22 GMT', 'content-type': 'application/json;charset=utf-8', 'source-last-modified': 'Tue, 28 Feb 2023 14:40:51 GMT', 'fields': '[":id",":created_at",":updated_at",":version",":@computed_region_6qbp_sg9q",":@computed_region_qgnn_b9vv",":@computed_region_26cr_cadq",":@computed_region_ajp5_b2md",":@computed_region_fyvs_ahh9",":@computed_region_p5aj_wyqh",":@computed_region_rxqg_mtj9",":@computed_region_yftq_j783",":@computed_region_bh8s_q3mv",":@computed_region_9jxd_iqea",":@computed_region_6ezc_tdp2",":@computed_region_6pnf_4xz7",":@computed_region_h4ep_8xdi",":@computed_region_pigm_ib2e",":@computed_region_jwn9_ihcz","eviction_id","address","city","state","zip","file_date","non_payment","breach","nuisance","illegal_use","failure_to_sign_renewal","access_denial","unapproved_subtenant","owner_move_in","demolition","capital_improvement","substantial_rehab","ellis_act_withdrawal","condo_conversion","roommate_same_unit","other_cause","late_payments","

In [4]:
from datetime import datetime

In [5]:
datetime.now()

datetime.datetime(2023, 3, 22, 3, 20, 40, 633295)

In [7]:
datetime.now().year

2023

In [8]:
datetime.now().day

22

In [10]:
datetime.now().strftime("%Y-%m-%d")

TypeError: strftime() missing required argument 'format' (pos 1)