In [20]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_timestamp, when
from pyspark.sql.types import StringType, IntegerType, FloatType, LongType, DoubleType

from utils.constants import Paths

Need to parse, properly-type and convert all base CSVs to .parquet


In [21]:
spark = SparkSession.builder.appName("CSV Worker").getOrCreate()

Define helper functions


In [22]:
def check_null_col(colname, df):
    return df.filter(col(colname).isNotNull()).count()

def generate_mappings(schema_str):
    lines = schema_str.split('\n')
    mappings = {}
    
    for line in lines:
        if line.strip().startswith('|--'):
            column_name = line.split(':')[0].strip().lstrip('|--')
            mappings[column_name] = StringType()
    
    return mappings

def cast_columns(df, type_mappings):
    for column, new_type in type_mappings.items():
        if isinstance(new_type, str):  # This means it's a datetime with a specific format
            df = df.withColumn(column, when(col(column).isNull(), None)
                                        .otherwise(to_timestamp(col(column), new_type)))
        else:
            df = df.withColumn(column, col(column).cast(new_type))
    return df

### Process

Read .csv $\rightarrow$ Create type mapping dictionary
$\rightarrow$ Cast types $\rightarrow$ Save to .parquet

### Data sets

This needs to be done for the following complaint sets:

1. 311 complaints
2. Housing code complaints

And the following government sets:

1. Pothole work orders
2. Parking violations
3. Vacant lot cleaning
4. Housing code violations


## 311 Dataset

In [6]:
df = spark.read.csv(str(Paths.RAW_DATA_CSV / "base_311.csv"), header=True)

In [7]:
type_mappings = {
"Unique Key":StringType(),
"Created Date":"MM/dd/yyyy hh:mm:ss a",
"Closed Date":"MM/dd/yyyy hh:mm:ss a",
"Agency":StringType(),
"Agency Name":StringType(),
"Complaint Type":StringType(),
"Descriptor":StringType(),
"Location Type":StringType(),
"Incident Zip":IntegerType(),
"Incident Address":StringType(),
"Street Name":StringType(),
"Cross Street 1":StringType(),
"Cross Street 2":StringType(),
"Intersection Street 1":StringType(),
"Intersection Street 2":StringType(),
"Address Type":StringType(),
"City":StringType(),
"Landmark":StringType(),
"Facility Type":StringType(),
"Status":StringType(),
"Due Date":"MM/dd/yyyy hh:mm:ss a",
"Resolution Description":StringType(),
"Resolution Action Updated Date":"MM/dd/yyyy hh:mm:ss a",
"Community Board":StringType(),
"BBL":StringType(),
"Borough":StringType(),
"Open Data Channel Type":StringType(),
"Park Facility Name":StringType(),
"Park Borough":StringType(),
"Vehicle Type":StringType(),
"Taxi Company Borough":StringType(),
"Taxi Pick Up Location":StringType(),
"Bridge Highway Name":StringType(),
"Bridge Highway Direction":StringType(),
"Road Ramp":StringType(),
"Bridge Highway Segment":StringType(),
"Latitude":FloatType(),
"Longitude":FloatType(),
"Location":StringType()
}

In [10]:
df = cast_columns(type_mappings, df)

In [8]:
df.write.mode('overwrite').parquet(str(Paths.RAW_DATA_PARQUET / "user/base_311.parquet"))

23/12/03 20:12:52 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
23/12/03 20:12:52 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
23/12/03 20:12:52 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
23/12/03 20:12:52 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers
23/12/03 20:13:02 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
23/12/03 20:13:02 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
23/12/03 20:13:02 WARN MemoryManage

## Housing Code Complaints Dataset

In [23]:
df = spark.read.csv(str(Paths.RAW_DATA_CSV / "base_housing_code_complaints.csv"), header=True)

In [24]:
schema = '''root
 |-- ComplaintID: string (nullable = true)
 |-- BuildingID: string (nullable = true)
 |-- BoroughID: string (nullable = true)
 |-- Borough: string (nullable = true)
 |-- HouseNumber: string (nullable = true)
 |-- StreetName: string (nullable = true)
 |-- Zip: string (nullable = true)
 |-- Block: string (nullable = true)
 |-- Lot: string (nullable = true)
 |-- Apartment: string (nullable = true)
 |-- CommunityBoard: string (nullable = true)
 |-- ReceivedDate: string (nullable = true)
 |-- StatusID: string (nullable = true)
 |-- Status: string (nullable = true)
 |-- StatusDate: string (nullable = true)
'''

In [39]:
generate_mappings(schema)

{' ComplaintID': StringType(),
 ' BuildingID': StringType(),
 ' BoroughID': StringType(),
 ' Borough': StringType(),
 ' HouseNumber': StringType(),
 ' StreetName': StringType(),
 ' Zip': StringType(),
 ' Block': StringType(),
 ' Lot': StringType(),
 ' Apartment': StringType(),
 ' CommunityBoard': StringType(),
 ' ReceivedDate': StringType(),
 ' StatusID': StringType(),
 ' Status': StringType(),
 ' StatusDate': StringType()}

In [48]:
type_mappings = {
    "ComplaintID": IntegerType(),
    "BuildingID": IntegerType(),
    "BoroughID": IntegerType(),
    "Borough": StringType(),
    "HouseNumber": StringType(),
    "StreetName": StringType(),
    "Zip": IntegerType(),
    "Block": IntegerType(),
    "Lot": IntegerType(),
    "Apartment": StringType(),
    "CommunityBoard": IntegerType(),
    "ReceivedDate": "MM/dd/yyyy",
    "StatusID": IntegerType(),
    "Status": StringType(),
    "StatusDate": "MM/dd/yyyy",
}

In [49]:
df = cast_columns(df)

In [None]:
df.write.mode('overwrite').parquet(str(Paths.RAW_DATA_PARQUET / "base_housing_code_complaints.parquet"))

## Housing Code Violations Dataset

In [27]:
df = spark.read.csv(str(Paths.RAW_DATA_CSV / "base_housing_code_violations.csv"), header=True)

In [28]:
schema='''root
|-- ViolationID: string (nullable = true)
|-- BuildingID: string (nullable = true)
|-- RegistrationID: string (nullable = true)
|-- BoroID: string (nullable = true)
|-- Borough: string (nullable = true)
|-- HouseNumber: string (nullable = true)
|-- LowHouseNumber: string (nullable = true)
|-- HighHouseNumber: string (nullable = true)
|-- StreetName: string (nullable = true)
|-- StreetCode: string (nullable = true)
|-- Postcode: string (nullable = true)
|-- Apartment: string (nullable = true)
|-- Story: string (nullable = true)
|-- Block: string (nullable = true)
|-- Lot: string (nullable = true)
|-- Class: string (nullable = true)
|-- InspectionDate: string (nullable = true)
|-- ApprovedDate: string (nullable = true)
|-- OriginalCertifyByDate: string (nullable = true)
|-- OriginalCorrectByDate: string (nullable = true)
|-- NewCertifyByDate: string (nullable = true)
|-- NewCorrectByDate: string (nullable = true)
|-- CertifiedDate: string (nullable = true)
|-- OrderNumber: string (nullable = true)
|-- NOVID: string (nullable = true)
|-- NOVDescription: string (nullable = true)
|-- NOVIssuedDate: string (nullable = true)
|-- CurrentStatusID: string (nullable = true)
|-- CurrentStatus: string (nullable = true)
|-- CurrentStatusDate: string (nullable = true)
|-- NovType: string (nullable = true)
|-- ViolationStatus: string (nullable = true)
|-- RentImpairing: string (nullable = true)
|-- Latitude: string (nullable = true)
|-- Longitude: string (nullable = true)
|-- CommunityBoard: string (nullable = true)
|-- CouncilDistrict: string (nullable = true)
|-- CensusTract: string (nullable = true)
|-- BIN: string (nullable = true)
|-- BBL: string (nullable = true)
|-- NTA: string (nullable = true)
'''

In [29]:
generate_mappings(schema)

{' ViolationID': StringType(),
 ' BuildingID': StringType(),
 ' RegistrationID': StringType(),
 ' BoroID': StringType(),
 ' Borough': StringType(),
 ' HouseNumber': StringType(),
 ' LowHouseNumber': StringType(),
 ' HighHouseNumber': StringType(),
 ' StreetName': StringType(),
 ' StreetCode': StringType(),
 ' Postcode': StringType(),
 ' Apartment': StringType(),
 ' Story': StringType(),
 ' Block': StringType(),
 ' Lot': StringType(),
 ' Class': StringType(),
 ' InspectionDate': StringType(),
 ' ApprovedDate': StringType(),
 ' OriginalCertifyByDate': StringType(),
 ' OriginalCorrectByDate': StringType(),
 ' NewCertifyByDate': StringType(),
 ' NewCorrectByDate': StringType(),
 ' CertifiedDate': StringType(),
 ' OrderNumber': StringType(),
 ' NOVID': StringType(),
 ' NOVDescription': StringType(),
 ' NOVIssuedDate': StringType(),
 ' CurrentStatusID': StringType(),
 ' CurrentStatus': StringType(),
 ' CurrentStatusDate': StringType(),
 ' NovType': StringType(),
 ' ViolationStatus': StringTy

In [30]:
type_mappings = {
    "ViolationID": IntegerType(),
    "BuildingID": IntegerType(),
    "RegistrationID": IntegerType(),
    "BoroID": IntegerType(),
    "Borough": StringType(),
    "HouseNumber": StringType(),
    "LowHouseNumber": StringType(),
    "HighHouseNumber": StringType(),
    "StreetName": StringType(),
    "StreetCode": IntegerType(),
    "Postcode": IntegerType(),
    "Apartment": StringType(),
    "Story": IntegerType(),
    "Block": IntegerType(),
    "Lot": IntegerType(),
    "Class": StringType(),
    "InspectionDate":"MM/dd/yyyy",
    "ApprovedDate":"MM/dd/yyyy",
    "OriginalCertifyByDate":"MM/dd/yyyy",
    "OriginalCorrectByDate":"MM/dd/yyyy",
    "NewCertifyByDate":"MM/dd/yyyy",
    "NewCorrectByDate":"MM/dd/yyyy",
    "CertifiedDate":"MM/dd/yyyy",
    "OrderNumber": IntegerType(),
    "NOVID": IntegerType(),
    "NOVDescription": StringType(),
    "NOVIssuedDate":"MM/dd/yyyy",
    "CurrentStatusID": IntegerType(),
    "CurrentStatus": StringType(),
    "CurrentStatusDate":"MM/dd/yyyy",
    "NovType": StringType(),
    "ViolationStatus": StringType(),
    "RentImpairing": StringType(),
    "Latitude": FloatType(),
    "Longitude": FloatType(),
    "CommunityBoard": IntegerType(),
    "CouncilDistrict": IntegerType(),
    "CensusTract": IntegerType(),
    "BIN": IntegerType(),
    "BBL": IntegerType(),
    "NTA": StringType(),
}

In [32]:
df = cast_columns(df,type_mappings)

In [43]:
spark.conf.set("spark.sql.parquet.int96RebaseModeInWrite", "LEGACY")

In [39]:
from pyspark.sql.functions import min, year

In [40]:
df = df.filter(year(col("InspectionDate")) >= 2010)

In [44]:
df.write.mode('overwrite').parquet(str(Paths.RAW_DATA_PARQUET / "gov/housing_code_violations.parquet"))

23/12/03 20:49:44 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
23/12/03 20:49:44 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
23/12/03 20:49:44 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers
23/12/03 20:49:50 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
23/12/03 20:49:50 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
23/12/03 20:49:50 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
23/12/03 20:49:50 WARN MemoryManager: Total allocation exceeds 95.00%

## Parking Violations Dataset

In [14]:
df = spark.read.csv(str(Paths.RAW_DATA_CSV / "base_parking.csv"), header=True)

In [13]:
schema='''root
 |-- Plate: string (nullable = true)
 |-- State: string (nullable = true)
 |-- License Type: string (nullable = true)
 |-- Summons Number: string (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Violation Time: string (nullable = true)
 |-- Violation: string (nullable = true)
 |-- Judgment Entry Date: string (nullable = true)
 |-- Fine Amount: string (nullable = true)
 |-- Penalty Amount: string (nullable = true)
 |-- Interest Amount: string (nullable = true)
 |-- Reduction Amount: string (nullable = true)
 |-- Payment Amount: string (nullable = true)
 |-- Amount Due: string (nullable = true)
 |-- Precinct: string (nullable = true)
 |-- County: string (nullable = true)
 |-- Issuing Agency: string (nullable = true)
 |-- Violation Status: string (nullable = true)
 |-- Summons Image: string (nullable = true)'''

In [17]:
generate_mappings(schema)

{' Plate': StringType(),
 ' State': StringType(),
 ' License Type': StringType(),
 ' Summons Number': StringType(),
 ' Issue Date': StringType(),
 ' Violation Time': StringType(),
 ' Violation': StringType(),
 ' Judgment Entry Date': StringType(),
 ' Fine Amount': StringType(),
 ' Penalty Amount': StringType(),
 ' Interest Amount': StringType(),
 ' Reduction Amount': StringType(),
 ' Payment Amount': StringType(),
 ' Amount Due': StringType(),
 ' Precinct': StringType(),
 ' County': StringType(),
 ' Issuing Agency': StringType(),
 ' Violation Status': StringType(),
 ' Summons Image': StringType()}

In [15]:
type_mappings = {
 'Plate': StringType(),
 'State': StringType(),
 'License Type': StringType(),
 'Summons Number': IntegerType(),
 'Issue Date': "MM/dd/yyyy",
 'Violation Time': StringType(),
 'Violation': StringType(),
 'Judgment Entry Date': "MM/dd/yyyy hh:mm:ss a",
 'Fine Amount': FloatType(),
 'Penalty Amount': FloatType(),
 'Interest Amount': FloatType(),
 'Reduction Amount': FloatType(),
 'Payment Amount': FloatType(),
 'Amount Due': FloatType(),
 'Precinct': IntegerType(),
 'County': StringType(),
 'Issuing Agency': StringType(),
 'Violation Status': StringType(),
 'Summons Image': StringType()
}


In [17]:
df = cast_columns(df,type_mappings)

In [None]:
df.write.mode('overwrite').parquet(str(Paths.RAW_DATA_PARQUET / "base_parking.parquet"))

## Pothole Work Orders Dataset

In [9]:
df = spark.read.csv(str(Paths.RAW_DATA_CSV / "base_pothole.csv"), header=True)

In [10]:
schema='''root
 |-- the_geom: string (nullable = true)
 |-- DefNum: string (nullable = true)
 |-- InitBy: string (nullable = true)
 |-- HouseNum: string (nullable = true)
 |-- OFT: string (nullable = true)
 |-- OnFaceName: string (nullable = true)
 |-- OnPrimName: string (nullable = true)
 |-- FrmPrimNam: string (nullable = true)
 |-- ToPrimName: string (nullable = true)
 |-- SpecLoc: string (nullable = true)
 |-- Boro: string (nullable = true)
 |-- Source: string (nullable = true)
 |-- RptDate: string (nullable = true)
 |-- RptClosed: string (nullable = true)
 |-- Shape_Leng: string (nullable = true)'''

In [11]:
generate_mappings(schema)

{' the_geom': StringType(),
 ' DefNum': StringType(),
 ' InitBy': StringType(),
 ' HouseNum': StringType(),
 ' OFT': StringType(),
 ' OnFaceName': StringType(),
 ' OnPrimName': StringType(),
 ' FrmPrimNam': StringType(),
 ' ToPrimName': StringType(),
 ' SpecLoc': StringType(),
 ' Boro': StringType(),
 ' Source': StringType(),
 ' RptDate': StringType(),
 ' RptClosed': StringType(),
 ' Shape_Leng': StringType()}

In [12]:
type_mappings = {
 'the_geom': StringType(),
 'DefNum': StringType(),
 'InitBy': StringType(),
 'HouseNum': IntegerType(),
 'OFT': LongType(),
 'OnFaceName': StringType(),
 'OnPrimName': StringType(),
 'FrmPrimNam': StringType(),
 'ToPrimName': StringType(),
 'SpecLoc': StringType(),
 'Boro': StringType(),
 'Source': StringType(),
 'RptDate': "MM/dd/yyyy",
 'RptClosed': "MM/dd/yyyy",
 'Shape_Leng': DoubleType()
}

In [13]:
df = cast_columns(df, type_mappings)

In [16]:
df.columns

['the_geom',
 'DefNum',
 'InitBy',
 'HouseNum',
 'OFT',
 'OnFaceName',
 'OnPrimName',
 'FrmPrimNam',
 'ToPrimName',
 'SpecLoc',
 'Boro',
 'Source',
 'RptDate',
 'RptClosed',
 'Shape_Leng']

In [18]:
df.columns

['the_geom',
 'DefNum',
 'InitBy',
 'HouseNum',
 'OFT',
 'OnFaceName',
 'OnPrimName',
 'FrmPrimNam',
 'ToPrimName',
 'SpecLoc',
 'Boro',
 'Source',
 'RptDate',
 'RptClosed',
 'Shape_Leng']

In [19]:
df.write.mode('overwrite').parquet(str(Paths.RAW_DATA_PARQUET / "gov/pothole_gov.parquet"))

23/12/03 20:20:02 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
23/12/03 20:20:02 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
23/12/03 20:20:02 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers
23/12/03 20:20:03 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
23/12/03 20:20:03 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
                                                                                

## Vacant Lot Cleaning Dataset

In [26]:
df = spark.read.csv(str(Paths.RAW_DATA_CSV / "base_vacant_lot.csv"),header=True)

In [31]:
schema = '''root
 |-- LotId: string (nullable = true)
 |--  InspectionRequestId: string (nullable = true)
 |--  LotNo: string (nullable = true)
 |--  PrivateInd: string (nullable = true)
 |--  AreaId: string (nullable = true)
 |--  LotStatusId: string (nullable = true)
 |--  LotStatusDate: string (nullable = true)
 |--  LotStatusReason: string (nullable = true)
 |--  OperId: string (nullable = true)
 |--  LotEntryDate: string (nullable = true)
 |--  BillingDate: string (nullable = true)
 |--  LotBllingTotal: string (nullable = true)
 |--  OverriteInd: string (nullable = true)
 |--  perimeter_ind: string (nullable = true)
 |--  FiveDayLetterNo: string (nullable = true)
 |--  Notes: string (nullable = true)
 |--  PSId: string (nullable = true)
 |--  DupCCU: string (nullable = true)
 |--  ActivityId: string (nullable = true)
 |--  NoOfAddresses: string (nullable = true)
 |--  RefNo: string (nullable = true)
 |--  LotNJInspectionRequestId: string (nullable = true)
 |--  LotDupInspectionRequestId: string (nullable = true)
 |--  LotIsSysFlagedDupOverridden: string (nullable = true)
 |--  LotIsSysFlagedNJOverridden: string (nullable = true)
 |--  LotPrevStatusIdBfrDupOrNJ: string (nullable = true)
 |--  LegalNotes: string (nullable = true)
 |--  FieldNotes: string (nullable = true)
 |--  supp_ind: string (nullable = true)
 |--  Old_lot_id: string (nullable = true)
 |--  Old_status_dd: string (nullable = true)
 |--  Old_status_rea_dd: string (nullable = true)
 |--  LotConversion_Notes: string (nullable = true)
 |--  OwnerLotLCD: string (nullable = true)
 |--  logno: string (nullable = true)
 |--  Indexno: string (nullable = true)
 |--  RefferedToDOHMHDate: string (nullable = true)
 |--  Structure: string (nullable = true)
 |--  ActivityDetailId: string (nullable = true)
 |--  TitleId: string (nullable = true)
 |--  Hours: string (nullable = true)
 |--  OperatorId_Detail: string (nullable = true)
 |--  DetailEntryDate: string (nullable = true)
 |--  DumpSiteId: string (nullable = true)
 |--  VehicleNo: string (nullable = true)
 |--  Act_Date: string (nullable = true)
 |--  ActivityEntryDate: string (nullable = true)
 |--  AbandonedCars: string (nullable = true)
 |--  ActivityStatusID: string (nullable = true)
 |--  EndDate: string (nullable = true)
 |--  ActivityBllingTotal: string (nullable = true)
 |--  ActivityStatusDate: string (nullable = true)
 |--  Total_hrs: string (nullable = true)
 |--  Total_tonnage: string (nullable = true)
 |--  Total_loads: string (nullable = true)
 |--  S_tires: string (nullable = true)
 |--  l_tires: string (nullable = true)
 |--  civilian_hrs: string (nullable = true)
 |--  fel_tires: string (nullable = true)
 |--  uniform_hrs: string (nullable = true)
 |--  Old_Main_lot_id: string (nullable = true)
 |--  StartDate: string (nullable = true)
 |--  Act_Old_lot_id: string (nullable = true)
 |--  LotCleaningTotal: string (nullable = true)
 |--  SalesTax: string (nullable = true)
 |--  IsBillGenerated: string (nullable = true)
 |--  Prefix: string (nullable = true)
 |--  House1: string (nullable = true)
 |--  House2: string (nullable = true)
 |--  StreetAddress: string (nullable = true)
 |--  RequestDate: string (nullable = true)
 |--  LastName: string (nullable = true)
 |--  FirstName: string (nullable = true)
 |--  BoroId: string (nullable = true)
 |--  DistrictId: string (nullable = true)
 |--  Note: string (nullable = true)
 |--  CCUYear: string (nullable = true)
 |--  CCUNo: string (nullable = true)
 |--  UserId: string (nullable = true)
 |--  RequestEntryDate: string (nullable = true)
 |--  RefId: string (nullable = true)
 |--  RequestSourceId: string (nullable = true)
 |--  ScanNo: string (nullable = true)
 |--  IsDeleted: string (nullable = true)
 |--  AssignedToInspector: string (nullable = true)
 |--  CrossStreetOne: string (nullable = true)
 |--  CrossStreettwo: string (nullable = true)
 |--  Channel: string (nullable = true)
 |--  Program: string (nullable = true)
 |--  Area: string (nullable = true)
 |--  CRMNo: string (nullable = true)
 |--  Phone: string (nullable = true)
 |--  CoordinateX: string (nullable = true)
 |--  CoordinateY: string (nullable = true)
 |--  SegmentId: string (nullable = true)
 |--  Bin: string (nullable = true)
 |--  Block: string (nullable = true)
 |--  RequestStatusID: string (nullable = true)
 |--  RequestStatusReason: string (nullable = true)
 |--  RequestStatusDate: string (nullable = true)
 |--  FLP: string (nullable = true)
 |--  InspectionDate: string (nullable = true)
 |--  InspectorFindings: string (nullable = true)
 |--  IsDuplicate: string (nullable = true)
 |--  Used_Prefix: string (nullable = true)
 |--  Used_House1: string (nullable = true)
 |--  Used_House2: string (nullable = true)
 |--  Used_StreetAddress: string (nullable = true)
 |--  Used_CrossStreetOne: string (nullable = true)
 |--  Used_CrossStreetTwo: string (nullable = true)
 |--  DupInspectionRequestId: string (nullable = true)
 |--  NJInspectionRequestId: string (nullable = true)
 |--  IsSysFlagedDupOverridden: string (nullable = true)
 |--  IsSysFlagedNJOverridden: string (nullable = true)
 |--  Zone: string (nullable = true)
 |--  ImportFromScan: string (nullable = true)
 |--  CopyExistingAddress: string (nullable = true)
 |--  Old_Inspect_ID: string (nullable = true)
 |--  Block_loc: string (nullable = true)
 |--  Used_Block_loc: string (nullable = true)
 |--  loc_id: string (nullable = true)
 |--  RequestConversion_Notes: string (nullable = true)
 |--  OriginalOverride: string (nullable = true)
 |--  UsedOverride: string (nullable = true)'''

s

In [34]:
generate_mappings(schema)

{' LotId': StringType(),
 '  InspectionRequestId': StringType(),
 '  LotNo': StringType(),
 '  PrivateInd': StringType(),
 '  AreaId': StringType(),
 '  LotStatusId': StringType(),
 '  LotStatusDate': StringType(),
 '  LotStatusReason': StringType(),
 '  OperId': StringType(),
 '  LotEntryDate': StringType(),
 '  BillingDate': StringType(),
 '  LotBllingTotal': StringType(),
 '  OverriteInd': StringType(),
 '  perimeter_ind': StringType(),
 '  FiveDayLetterNo': StringType(),
 '  Notes': StringType(),
 '  PSId': StringType(),
 '  DupCCU': StringType(),
 '  ActivityId': StringType(),
 '  NoOfAddresses': StringType(),
 '  RefNo': StringType(),
 '  LotNJInspectionRequestId': StringType(),
 '  LotDupInspectionRequestId': StringType(),
 '  LotIsSysFlagedDupOverridden': StringType(),
 '  LotIsSysFlagedNJOverridden': StringType(),
 '  LotPrevStatusIdBfrDupOrNJ': StringType(),
 '  LegalNotes': StringType(),
 '  FieldNotes': StringType(),
 '  supp_ind': StringType(),
 '  Old_lot_id': StringType(

In [45]:
type_mappings = {
 'LotId': IntegerType(),
 'InspectionRequestId': IntegerType(),
 'LotNo': IntegerType(),
 'PrivateInd': IntegerType(),
 'AreaId': IntegerType(),
 'LotStatusId': IntegerType(),
 'LotStatusDate': "MM/dd/yyyy hh:mm:ss a",
 'LotStatusReason': StringType(),
 'OperId': IntegerType(),
 'LotEntryDate': "MM/dd/yyyy hh:mm:ss a",
 'BillingDate': "MM/dd/yyyy hh:mm:ss a",
 'LotBllingTotal': FloatType(),
 'OverriteInd': IntegerType(),
 'perimeter_ind': IntegerType(),
 'FiveDayLetterNo': IntegerType(),
 'Notes': StringType(),
 'PSId': IntegerType(),
 'DupCCU': StringType(),
 'ActivityId': IntegerType(),
 'NoOfAddresses': IntegerType(),
 'RefNo': StringType(),
 'LotNJInspectionRequestId': IntegerType(),
 'LotDupInspectionRequestId': IntegerType(),
 'LotIsSysFlagedDupOverridden': IntegerType(),
 'LotIsSysFlagedNJOverridden': IntegerType(),
 'LotPrevStatusIdBfrDupOrNJ': IntegerType(),
 'LegalNotes': StringType(),
 'FieldNotes': StringType(),
 'supp_ind': StringType(),
 'Old_lot_id': StringType(),
 'Old_status_dd': StringType(),
 'Old_status_rea_dd': StringType(),
 'LotConversion_Notes': StringType(),
 'OwnerLotLCD': StringType(),
 'logno': StringType(),
 'Indexno': StringType(),
 'RefferedToDOHMHDate': "MM/dd/yyyy hh:mm:ss a",
 'Structure': StringType(),
 'ActivityDetailId': IntegerType(),
 'TitleId': IntegerType(),
 'Hours': FloatType(),
 'OperatorId_Detail': IntegerType(),
 'DetailEntryDate': "MM/dd/yyyy hh:mm:ss a",
 'DumpSiteId': IntegerType(),
 'VehicleNo': StringType(),
 'Act_Date': "MM/dd/yyyy hh:mm:ss a",
 'ActivityEntryDate': "MM/dd/yyyy hh:mm:ss a",
 'AbandonedCars': IntegerType(),
 'ActivityStatusID': IntegerType(),
 'EndDate': "MM/dd/yyyy hh:mm:ss a",
 'ActivityBllingTotal': StringType(),
 'ActivityStatusDate': "MM/dd/yyyy hh:mm:ss a",
 'Total_hrs': FloatType(),
 'Total_tonnage': FloatType(),
 'Total_loads': FloatType(),
 'S_tires': IntegerType(),
 'l_tires': IntegerType(),
 'civilian_hrs': IntegerType(),
 'fel_tires': IntegerType(),
 'uniform_hrs': IntegerType(),
 'Old_Main_lot_id': StringType(),
 'StartDate': "MM/dd/yyyy hh:mm:ss a",
 'Act_Old_lot_id': StringType(),
 'LotCleaningTotal': FloatType(),
 'SalesTax': FloatType(),
 'IsBillGenerated': IntegerType(),
 'Prefix': IntegerType(),
 'House1': IntegerType(),
 'House2': IntegerType(),
 'StreetAddress': StringType(),
 'RequestDate': "MM/dd/yyyy hh:mm:ss a",
 'LastName': StringType(),
 'FirstName': StringType(),
 'BoroId': IntegerType(),
 'DistrictId': IntegerType(),
 'Note': StringType(),
 'CCUYear': IntegerType(),
 'CCUNo': IntegerType(),
 'UserId': IntegerType(),
 'RequestEntryDate': "MM/dd/yyyy hh:mm:ss a",
 'RefId': IntegerType(),
 'RequestSourceId': IntegerType(),
 'ScanNo': StringType(),
 'IsDeleted': IntegerType(),
 'AssignedToInspector': StringType(),
 'CrossStreetOne': StringType(),
 'CrossStreettwo': StringType(),
 'Channel': IntegerType(),
 'Program': IntegerType(),
 'Area': IntegerType(),
 'CRMNo': IntegerType(),
 'Phone': StringType(),
 'CoordinateX': IntegerType(),
 'CoordinateY': IntegerType(),
 'SegmentId': IntegerType(),
 'Bin': IntegerType(),
 'Block': IntegerType(),
 'RequestStatusID': IntegerType(),
 'RequestStatusReason': StringType(),
 'RequestStatusDate': "MM/dd/yyyy hh:mm:ss a",
 'FLP': IntegerType(),
 'InspectionDate': "MM/dd/yyyy hh:mm:ss a",
 'InspectorFindings': StringType(),
 'IsDuplicate': IntegerType(),
 'Used_Prefix': IntegerType(),
 'Used_House1': IntegerType(),
 'Used_House2': IntegerType(),
 'Used_StreetAddress': StringType(),
 'Used_CrossStreetOne': StringType(),
 'Used_CrossStreetTwo': StringType(),
 'DupInspectionRequestId': IntegerType(),
 'NJInspectionRequestId': IntegerType(),
 'IsSysFlagedDupOverridden': IntegerType(),
 'IsSysFlagedNJOverridden': IntegerType(),
 'Zone': IntegerType(),
 'ImportFromScan': IntegerType(),
 'CopyExistingAddress': IntegerType(),
 'Old_Inspect_ID': IntegerType(),
 'Block_loc': StringType(),
 'Used_Block_loc': StringType(),
 'loc_id': StringType(),
 'RequestConversion_Notes': StringType(),
 'OriginalOverride': IntegerType(),
 'UsedOverride': IntegerType()}

In [47]:
df = df.select([col(c).alias(c.replace(' ', '')) for c in df.columns])

In [48]:
df = cast_columns(df, type_mappings)

In [None]:
df.write.mode('overwrite').parquet(str(Paths.RAW_DATA_PARQUET / "base_vacant_lot.parquet"))

Now for 4 gov datasets (Housing code, parking, pothole, vacant lot)