In [1]:
import os
from datetime import datetime
import requests

import pyspark
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, TimestampType
from pyspark.sql.functions import col
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, explode, col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, IntegerType, DoubleType

import pytz
from astral import LocationInfo
from astral.sun import sun

In [9]:
spark.stop()

In [10]:
credentials_location = '/home/lulu/projects/data_talks/mage-zoomcamp/google_cloud_key.json'

spark_home = os.environ.get('SPARK_HOME')
jar_file = os.path.join(spark_home, 'lib', 'gcs-connector-hadoop3-latest.jar')
print(spark_home)
print(jar_file)

/home/lulu/spark/spark-3.5.0-bin-hadoop3
/home/lulu/spark/spark-3.5.0-bin-hadoop3/lib/gcs-connector-hadoop3-latest.jar


In [11]:
conf = SparkConf() \
    .setMaster('local[*]') \
    .setAppName('test') \
    .set("spark.jars", jar_file) \
    .set("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
    .set("spark.hadoop.google.cloud.auth.service.account.json.keyfile", credentials_location)

sc = SparkContext(conf=conf)

hadoop_conf = sc._jsc.hadoopConfiguration()

hadoop_conf.set("fs.AbstractFileSystem.gs.impl",  "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
hadoop_conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
hadoop_conf.set("fs.gs.auth.service.account.json.keyfile", credentials_location)
hadoop_conf.set("fs.gs.auth.service.account.enable", "true")

spark = SparkSession.builder \
    .config(conf=sc.getConf()) \
    .getOrCreate()

In [37]:
# Must leave everything as string and cast as int or double later otherwise interpreted as null

custom_schema = StructType([
    StructField('collision_id', StringType(), True),
    StructField('crash_date', TimestampType(), True),
    StructField('crash_time', TimestampType(), True),
    StructField('zip_code', StringType(), True),
    StructField('borough', StringType(), True),
    StructField('latitude', StringType(), True), 
    StructField('longitude', StringType(), True),
    #StructField('location_latitude', DoubleType(), True),  # Expanded latitude field
    #StructField('location_longitude', DoubleType(), True),  # Expanded longitude field
    #StructField('location_human_address', StringType(), True),  # Expanded human_address field
    StructField('on_street_name', StringType(), True),
    StructField('off_street_name', StringType(), True),
    StructField('cross_street_name', StringType(), True),
    StructField('number_of_persons_injured', StringType(), True),
    StructField('number_of_pedestrians_injured', StringType(), True),
    StructField('number_of_cyclist_injured', StringType(), True),
    StructField('number_of_motorist_injured', StringType(), True),
    StructField('number_of_persons_killed', StringType(), True),
    StructField('number_of_pedestrians_killed', StringType(), True),
    StructField('number_of_cyclist_killed', StringType(), True),
    StructField('number_of_motorist_killed', StringType(), True),
    StructField('contributing_factor_vehicle_1', StringType(), True),
    StructField('contributing_factor_vehicle_2', StringType(), True),
    StructField('contributing_factor_vehicle_3', StringType(), True),
    StructField('contributing_factor_vehicle_4', StringType(), True),
    StructField('contributing_factor_vehicle_5', StringType(), True),
    StructField('vehicle_type_code1', StringType(), True),
    StructField('vehicle_type_code2', StringType(), True),
    StructField('vehicle_type_code_3', StringType(), True),
    StructField('vehicle_type_code_4', StringType(), True),
    StructField('vehicle_type_code_5', StringType(), True)
])


In [31]:
offset = 0
batch_num = 0
batch_size = 200000
batch_data = []


api_endpoint = 'https://data.cityofnewyork.us/resource/h9gi-nx95.json'

# Fetch JSON data from the API in batches
while True:
    
    # RETRIEVE DATA FROM API ENDPOINT 
    url = f'{api_endpoint}?$limit={batch_size}&$offset={offset}'
    print(url)
    
    response = requests.get(url)
    
    if not response.ok:
        raise Exception(f"Failed to fetch data from API: {response.status_code}")

    # Extract JSON content
    json_content = response.json()
    
    # Convert JSON content to RDD
    rdd = spark.sparkContext.parallelize(json_content)
    
    # Read JSON data into DataFrame
    df = spark.read.json(rdd, schema=custom_schema)
    
    # Append batch DataFrame to the list
    batch_data.append(df)
    
    offset += batch_size
    batch_num += 1
    
    if batch_num == 3:
    #if offset > 200000:
        break
    
# Union all batch DataFrames
final_df = batch_data[0]
for batch_df in batch_data[1:]:
    final_df = final_df.union(batch_df)

https://data.cityofnewyork.us/resource/h9gi-nx95.json?$limit=200000&$offset=0
https://data.cityofnewyork.us/resource/h9gi-nx95.json?$limit=200000&$offset=200000
https://data.cityofnewyork.us/resource/h9gi-nx95.json?$limit=200000&$offset=400000


In [33]:
# Get the number of rows
num_rows = final_df.count()

# Get the number of columns
num_columns = len(final_df.columns)

print("Number of rows:", num_rows)
print("Number of columns:", num_columns)

24/04/16 15:31:48 WARN TaskSetManager: Stage 5 contains a task of very large size (3834 KiB). The maximum recommended task size is 1000 KiB.

Number of rows: 600000
Number of columns: 28


                                                                                

In [42]:
# CAST THE DATA TYPES

columns_to_cast = {
    "crash_date": "date",
    "crash_time": "time",
    "latitude": DoubleType(),
    "longitude": DoubleType(),
    "number_of_persons_injured": IntegerType(),
    "number_of_pedestrians_injured": IntegerType(),
    "number_of_cyclist_injured": IntegerType(),
    "number_of_motorist_injured": IntegerType(),
    "number_of_persons_killed": IntegerType(),
    "number_of_pedestrians_killed": IntegerType(),
    "number_of_cyclist_killed": IntegerType(),
    "number_of_motorist_killed": IntegerType(),
    "collision_id": IntegerType(),
    "zip_code": IntegerType(),
}
    
for col_name, col_type in columns_to_cast.items():
    final_df = final_df.withColumn(col_name, col(col_name).cast(col_type))

ParseException: 
[UNSUPPORTED_DATATYPE] Unsupported data type "TIME".(line 1, pos 0)

== SQL ==
time
^^^


In [40]:
final_df.schema

StructType([StructField('collision_id', IntegerType(), True), StructField('crash_date', TimestampType(), True), StructField('crash_time', TimestampType(), True), StructField('zip_code', IntegerType(), True), StructField('borough', StringType(), True), StructField('latitude', DoubleType(), True), StructField('longitude', DoubleType(), True), StructField('on_street_name', StringType(), True), StructField('off_street_name', StringType(), True), StructField('cross_street_name', StringType(), True), StructField('number_of_persons_injured', IntegerType(), True), StructField('number_of_pedestrians_injured', IntegerType(), True), StructField('number_of_cyclist_injured', IntegerType(), True), StructField('number_of_motorist_injured', IntegerType(), True), StructField('number_of_persons_killed', IntegerType(), True), StructField('number_of_pedestrians_killed', IntegerType(), True), StructField('number_of_cyclist_killed', IntegerType(), True), StructField('number_of_motorist_killed', IntegerType(

In [41]:
final_df.show(5)

+------------+-------------------+-------------------+--------+--------+---------+----------+--------------------+---------------+--------------------+-------------------------+-----------------------------+-------------------------+--------------------------+------------------------+----------------------------+------------------------+-------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+------------------+------------------+-------------------+-------------------+-------------------+
|collision_id|         crash_date|         crash_time|zip_code| borough| latitude| longitude|      on_street_name|off_street_name|   cross_street_name|number_of_persons_injured|number_of_pedestrians_injured|number_of_cyclist_injured|number_of_motorist_injured|number_of_persons_killed|number_of_pedestrians_killed|number_of_cyclist_killed|number_of_motorist_killed|contributing_factor_v

24/04/16 15:34:28 WARN TaskSetManager: Stage 8 contains a task of very large size (3834 KiB). The maximum recommended task size is 1000 KiB.
