## Requirments/ Config


In [None]:
import os
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

# Initialize Spark session
service_account_key_path = "gs://auth_service_key_seriousprojectid/serious-unison-441416-j6-556d0d88d23e.json"

# Initialize Spark session with GCS configuration
spark = SparkSession.builder \
    .appName("NYC fetch Updatedrecords") \
    .config("spark.hadoop.google.cloud.auth.service.account.json.keyfile", service_account_key_path) \
    .config("spark.hadoop.fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") \
    .config("spark.hadoop.fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS") \
    .config("spark.jars.packages", "com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.23.2") \
    .config("spark.sql.debug.maxToStringFields", "2000") \
    .getOrCreate()


gcs_bucket = "gs://nyc-311-requests/raw"
processed_gcs_bucket= "gs://nyc-311-requests/transformed"

### allocate the parquetfile for transformation

In [None]:
# Step 1: Load the existing Parquet file
df=spark.read.parquet('gs://nyc-311-requests/raw/nyc311-datafetch-2025.parquet')
df.show(1)
print('suceessfully loaded the parquet file')

## DATA CLEANING

In [None]:
tran_df=df

#remove the null values from Unique_key (PK)
tran_df=df.filter(df['unique_key'].isNotNull())


# change of datatype (timestamp) by datecolumns
date_columns = [
    'created_date',
    'closed_date',
    'resolution_action_updated_date',
    'due_date'
]

for column in date_columns:
    tran_df = tran_df.withColumn(column, to_timestamp(col(column), "yyyy-MM-dd'T'HH:mm:ss.SSS"))
##check for the nulls in created_date if any then recoords had the wrong datatype convertion


# change of datatype (doubletype) by geolocation
geo_columns = [
    'latitude',
    'longitude'
]

for column in geo_columns:
  tran_df = tran_df.withColumn(column, col(column).cast("double"))

tran_df=tran_df.withColumn('location_type', lower(col('location_type')))


# null to specified string
tran_df = tran_df.fillna({'borough': 'Unspecified'})

# replace null with N/A
null_string_columns= ['descriptor', 'resolution_description',
    'location_type',
    'facility_type',
    'vehicle_type',
    'taxi_company_borough',
    'taxi_pick_up_location',
    'bridge_highway_direction',
    'bridge_highway_name',
    'bridge_highway_segment',
    'road_ramp',
    'address_type',
    'street_name',
    'city',
    'incident_address',
    'incident_zip',
    'bbl'
]


for column in null_string_columns:
    tran_df = tran_df.fillna({column: "N/A" for column in null_string_columns})


# checks the of cleaned data
tran_df.printSchema()


## DATA TRANSFORMATION

In [None]:
#categorize the schemas (date)

Identification_columns = [
    'unique_key',
    'created_date',
    'status',
    'closed_date',
    'resolution_action_updated_date',
    'due_date',
    'resolution_description',
    'open_data_channel_type'
]

identification_df=tran_df.select(Identification_columns)
identification_df.show(10)

In [None]:
#categorize the schemas (location)

location_columns = [
    'unique_key',
    'location_type', #Street/Sidewalk
    'address_type',  # ADDRESS
    'borough',       #QUEENS
    'community_board', # 10 QUEENS
    'street_name',      #  80 STREET
    'city',             #HOWARD BEACH
    'incident_address', # 153-39 80 STREET
    'incident_zip',     #  11414
    'bbl',              # 4114420043
    'latitude',
    'longitude'
]

location_df=tran_df.select(location_columns)
location_df.show(10)

In [None]:
# categorize the schemas (compliant)

complaint_columns = [
    'unique_key',
    'agency',
    'agency_name',
    'complaint_type',
    'descriptor',
    'location_type',
    'facility_type',
    'vehicle_type',
    'taxi_company_borough',
    'taxi_pick_up_location',
    'bridge_highway_direction',
    'bridge_highway_name',
    'bridge_highway_segment',
    'road_ramp',
    'park_facility_name',
    'park_borough'
]

complaint_df=tran_df.select(complaint_columns)
complaint_df.show(10)

In [None]:
# had to do basic cleaning as main dataframe 
identification_df.dropDuplicates()
complaint_df.dropDuplicates()
location_df.dropDuplicates()

## Quality checks(validations)

In [None]:
#checks if records count as same across the transformed tables/
print(identification_df.count())
print(complaint_df.count())
location_df.count()


#check for nulls in each transformed tables
identification_df.select([count(when(isnull(c), c)).alias(c) for c in identification_df.columns]).show()
complaint_df.select([count(when(isnull(c), c)).alias(c) for c in complaint_df.columns]).show()
location_df.select([count(when(isnull(c), c)).alias(c) for c in location_df.columns]).show()


# check for primary key
duplicate_count = tran_df.groupBy('unique_key').count().filter(col('count')>1)
duplicate_count.show()


#validates range of dates in the tables
identification_df.select(min('created_date')).show()
identification_df.select(max('created_date')).show()



##check for the nulls in created_date if any then recoords had the wrong datatype convertion

#checks the address_type
location_df.select('address_type').distinct().show()





location_df.select('location_type').distinct().show()
identification_df.select('status').distinct().show()
complaint_df.select('vehicle_type').distinct().show()





# validates locations in NYC area
invalid_geo_count = location_df.filter(
    (col('latitude') < 40.4774) | (col('latitude') > 40.9176) |
    (col('longitude') < -74.2591) | (col('longitude') > -73.7002)
).count()
invalid_geo_count





location_df.filter(
    (col('incident_zip') != "N/A") |
    (col('incident_zip') < 10001) | (col('incident_zip') > 11699)
).count()





valid_agencies = [
    'DEP', 'DPR', 'DOB', 'DOS', 'DOT', 'HPD', 'TLC',
    'EDC', 'NYPD', 'DOE', 'DOHMH', 'DSNY', 'DCWP', 'DHS', 'OTI'
]  # Example list of valid agencies
invalid_agencies = complaint_df.filter(~col('agency').isin(valid_agencies))
invalid_agencies.count()


### Loading data

In [None]:
# to cloud storage

output_path = "gs://nyc-311-requests/transformed/"
identification_df.coalesce(1).write.parquet("gs://nyc-311-requests/transformed/identification.parquet", mode="append")
location_df.coalesce(1).write.parquet("gs://nyc-311-requests/transformed/location.parquet", mode="append")
complaint_df.coalesce(1).write.parquet("gs://nyc-311-requests/transformed/complaint.parquet", mode="append")


In [None]:
# writing to BigQuery as tables

# Define the BigQuery project ID, dataset, and table
project_id = "serious-unison-441416-j6"
dataset_name = "nyc_311requests"
table_name = "case_details"  # Existing table name

# Define the GCS path for temporary storage
transformed_gcs_output_path = "gs://nyc-311-requests/transformed/jan3"

# Write DataFrame to BigQuery with append mode
identification_df.write.format("bigquery") \
    .option("project", project_id) \
    .option("dataset", dataset_name) \
    .option("table", table_name) \
    .option("temporaryGcsBucket", transformed_gcs_output_path) \
    .option("writeMethod", "direct") \
    .option("createDisposition", "CREATE_IF_NEEDED") \
    .option("writeDisposition", "WRITE_APPEND") \
    .mode("append") \
    .save()

print("Data appended to table ....'case_details'.... in BigQuery successfully!")

In [None]:
# Define the BigQuery project ID, dataset, and table
project_id = "serious-unison-441416-j6"
dataset_name = "nyc_311requests"
table_name = "location"  # New table name

location_df.write.format("bigquery") \
    .option("project", project_id) \
    .option("dataset", dataset_name) \
    .option("table", table_name) \
    .option("temporaryGcsBucket", transformed_gcs_output_path) \
    .option("writeMethod", "direct") \
    .option("createDisposition", "CREATE_IF_NEEDED") \
    .option("writeDisposition", "WRITE_APPEND") \
    .mode("append") \
    .save()

print("Data appended to table ....'location'.... in BigQuery successfully!")

In [None]:
# Define the BigQuery project ID, dataset, and table
project_id = "serious-unison-441416-j6"
dataset_name = "nyc_311requests"
table_name = "complaint"  # New table name

complaint_df.write.format("bigquery") \
    .option("project", project_id) \
    .option("dataset", dataset_name) \
    .option("table", table_name) \
    .option("temporaryGcsBucket", transformed_gcs_output_path) \
    .option("writeMethod", "direct") \
    .option("createDisposition", "CREATE_IF_NEEDED") \
    .option("writeDisposition", "WRITE_APPEND") \
    .mode("append") \
    .save()

print("Data appended to table ....'complaint'.... in BigQuery successfully!")