## Task 1
Import CSVs and store them in:

        Parquet format
        Avro format
        HDF5 format

In [1]:
import warnings; warnings.simplefilter(action='ignore', category=FutureWarning)
import pyarrow as pa
import pyarrow.parquet as pq
from pyarrow import csv
from pyarrow import dataset as ds
import os
import numpy as np
import pandas as pd


import dask_jobqueue
import dask
import dask.dataframe as dd
import time

from distributed import Client
from contextlib import suppress 
import platform

PATH = "/d/hpc/projects/FRI/bigdata/data/NYTickets"

In [114]:
with suppress(Exception):
    client.shutdown()

cluster = dask_jobqueue.SLURMCluster(
            queue = 'all', 
            processes=1,
            cores=32, 
            memory='8GB',
            scheduler_options={'dashboard_address': ':21722'},
            death_timeout=180, # seconds
            walltime="01:30",
          )

client = Client(cluster, timeout="180")#, memory_limit='8GB')
display(client.cluster)

Tab(children=(HTML(value='<div class="jp-RenderedHTMLCommon jp-RenderedHTML jp-mod-trusted jp-OutputArea-outpu…

In [113]:
client.shutdown()

In [115]:
client.cluster.scale(32)
display(client.cluster)

Tab(children=(HTML(value='<div class="jp-RenderedHTMLCommon jp-RenderedHTML jp-mod-trusted jp-OutputArea-outpu…

In [4]:
files = os.listdir(PATH)
files = list([f for f in files if ".csv" in f])
files.sort()
#files = files[1:]
print(files)

['2014.csv', '2015.csv', '2016.csv', '2017.csv', '2018.csv', '2019.csv', '2020.csv', '2021.csv', '2022.csv']


### Parquet part

In [5]:
column_types = {
"Summons Number" : pa.int64(),                     
"Plate ID" : pa.string(),                           
"Registration State" : pa.string(),                 
"Plate Type" : pa.string(),                         
"Issue Date" : pa.string(),                         
"Violation Code" : pa.int64(),                      
"Vehicle Body Type" : pa.string(),                  
"Vehicle Make" : pa.string(),                       
"Issuing Agency" : pa.string(),                     
"Street Code1" : pa.int64(),                        
"Street Code2" : pa.int64(),                        
"Street Code3" : pa.int64(),                                  
"Vehicle Expiration Date" : pa.string(),            
"Violation Location" : pa.int64(),                  
"Violation Precinct" : pa.int64(),                  
"Issuer Precinct" : pa.int64(),                     
"Issuer Code" : pa.int64(),                         
"Issuer Command" : pa.string(),                     
"Issuer Squad" : pa.string(),                       
"Violation Time" : pa.string(),                     
"Time First Observed" : pa.string(),                
"Violation County" : pa.string(),                   
"Violation In Front Of Or Opposite" : pa.string(),  
"Number" : pa.string(),                             
"House Number" : pa.string(),                       
"Street" : pa.string(),                             
"Street Name" : pa.string(),                        
"Intersecting Street" : pa.string(),                
"Date First Observed" : pa.string(),                
"Law Section" : pa.int64(),                         
"Sub Division" : pa.string(),                       
"Violation Legal Code" : pa.string(),               
"Days Parking In Effect    " : pa.string(),         
"From Hours In Effect" : pa.string(),               
"To Hours In Effect" : pa.string(),                 
"Vehicle Color" : pa.string(),                      
"Unregistered Vehicle?" : pa.string(),               
"Vehicle Year" : pa.int64(),                        
"Meter Number" : pa.string(),                       
"Feet From Curb" : pa.int64(),                      
"Violation Post Code" : pa.string(),                
"Violation Description" : pa.string(),              
"No Standing or Stopping Violation" : pa.string(),  
"Hydrant Violation" : pa.string(),                  
"Double Parking Violation" : pa.string()}  

In [116]:
convert_options = csv.ConvertOptions(column_types=column_types)
custom_csv_format = ds.CsvFileFormat(convert_options=convert_options)
to_drop = ["Plate ID","Time First Observed","Violation In Front Of Or Opposite","Feet From Curb",
           "Violation Description","Hydrant Violation",
          "Double Parking Violation" , "Number","Sub Division",  "Street", "Street Name", "No Standing or Stopping Violation", 
           "Meter Number", "Date First Observed" , "Unregistered Vehicle?", "Violation Location", 
           "Issuer Squad", "Issuer Command", "Violation Precinct", "Issuer Precinct", "Issuer Code"
           "Days Parking In Effect", "House Number"]

def drop_cols_parquet(table, drops = to_drop):
    for d in drops:
        try:
            table = table.drop([d])
        except:
            continue
    return table

In [8]:
t = ds.dataset(PATH+"/"+files[0], format=custom_csv_format).to_table()
col_names = t.column_names
#print(col_names)
t = drop_cols_parquet(t)

s = time.time()
for f in files[1:]:
    tmp = ds.dataset(PATH+"/"+f, format=custom_csv_format).to_table()
    tmp = tmp.rename_columns(col_names)
    tmp = drop_cols_parquet(tmp)
    t = pa.concat_tables([t, tmp])
    print(time.time()-s)
#pq.write_table(t, '/d/hpc/projects/FRI/bigdata/students/mfmt/entire_1.parquet')

21.00762701034546
52.52729058265686
72.89254307746887
94.5622627735138
114.52813792228699
151.75891304016113
191.7809453010559
324.6406235694885


### Avro part

In [7]:
avro_schema = {'name': 'NY_park_ticket', 'doc':"New York data for parking tickets ",
          'type': 'record',
          'fields': [{'name': 'Summons Number', 'type': ["null",  "float", "string"]}, 
{'name':  'Registration State', 'type': ["null",  "float", "string"]}, 
{'name':  'Plate Type', 'type': ["null",  "float", "string"]}, 
{'name':  'Issue Date', 'type': ["null",  "float", "string"]}, 
{'name': 'Violation Code', 'type': ["null",  "float", "string"]}, 
{'name':  'Vehicle Body Type', 'type': ["null",  "float", "string"]}, 
{'name':  'Vehicle Make', 'type': ["null",  "float", "string"]}, 
{'name':  'Issuing Agency', 'type': ["null",  "float", "string"]}, 
{'name':  'Street Code1', 'type': ["null",  "float", "string"]}, 
{'name':  'Street Code2', 'type': ["null",  "float", "string"]}, 
{'name':  'Street Code3', 'type': ["null",  "float", "string"]}, 
{'name':        'Vehicle Expiration Date', 'type': ["null",  "float", "string"]}, 
{'name':  'Violation Location', 'type': ["null",  "float", "string"]}, 
{'name':  'Violation Precinct', 'type': ["null",  "float", "string"]}, 
{'name':        'Issuer Precinct', 'type': ["null",  "float", "string"]}, 
{'name':  'Issuer Code', 'type': ["null",  "float", "string"]}, 
{'name':  'Issuer Command', 'type': ["null",  "float", "string"]}, 
{'name':  'Issuer Squad', 'type': ["null",  "float", "string"]}, 
{'name':        'Violation Time', 'type': ["null",  "float", "string"]}, 
{'name':  'Violation County', 'type': ["null",  "float", "string"]}, 
{'name':  'Number', 'type': ["null",  "float", "string"]}, 
{'name':  'Intersecting Street', 'type': ["null",  "float", "string"]}, 
{'name':        'Law Section', 'type': ["null",  "float", "string"]}, 
{'name':  'Violation Legal Code', 'type': ["null",  "float", "string"]}, 
{'name':  'Days Parking In Effect    ', 'type': ["null",  "float", "string"]}, 
{'name':        'From Hours In Effect', 'type': ["null",  "float", "string"]}, 
{'name':  'To Hours In Effect', 'type': ["null",  "float", "string"]}, 
{'name':  'Vehicle Color', 'type': ["null",  "float", "string"]}, 
{'name':        'Unregistered Vehicle?', 'type': ["null",  "float", "string"]}, 
{'name':  'Vehicle Year', 'type': ["null",  "float", "string"]}, 
{'name':  'Violation Post Code', 'type': ["null",  "float", "string"]}, 
{'name':        'House Number', 'type': ["null",  "float", "string"]}, 
{'name':  'Days Parking In Effect', 'type':['null', 'float','string']}]}

In [37]:
avro_schema = {'name': 'Violations', 'doc': 'Parking Violations Dataset',
                   'type': 'record',
                   'fields': [
                       {'name': 'Summons Number', 'type': 'int'},
                       {'name': 'Plate ID', 'type': ['string', 'float'], 'default':'null'},
                       {'name': 'Registration State', 'type': ['string', 'float'], 'default':'null'},
                       {'name': 'Plate Type', 'type': ['string', 'float'], 'default':'null'},
                       {'name': 'Issue Date', 'type': ['string', 'float'], 'default':'null'},
                       {'name': 'Violation Code', 'type': ['string', 'float'], 'default':'null'},
                       {'name': 'Vehicle Body Type', 'type': ['string', 'float'], 'default':'null'},
                       {'name': 'Vehicle Make', 'type': ['string', 'float'], 'default':'null'},
                       {'name': 'Issuing Agency', 'type': ['string', 'float'], 'default':'null'},
                       {'name': 'Street Code1', 'type': ['string', 'float'], 'default':'null'},
                       {'name': 'Street Code2', 'type': ['string', 'float'], 'default':'null'},
                       {'name': 'Street Code3', 'type': ['string', 'float'], 'default':'null'},
                       {'name': 'Vehicle Expiration Date', 'type': ['string', 'float'], 'default':'null'},
                       {'name': 'Violation Location', 'type': ['string', 'float'], 'default':'null'},
                       {'name': 'Violation Precinct', 'type': ['string', 'float'], 'default':'null'},
                       {'name': 'Issuer Precinct', 'type': ['string', 'float'], 'default':'null'},
                       {'name': 'Issuer Code', 'type': ['string', 'float'], 'default':'null'},
                       {'name': 'Issuer Command', 'type': ['string', 'float'], 'default':'null'},
                       {'name': 'Issuer Squad', 'type': ['string', 'float'], 'default':'null'},
                       {'name': 'Violation Time', 'type': ['string', 'float', 'int'], 'default':'null'},
                       {'name': 'Time First Observed', 'type': ['string', 'float'], 'default':'null'},
                       {'name': 'Violation County', 'type': ['string', 'float'], 'default':'null'},
                       {'name': 'Violation In Front Of Or Opposite', 'type': ['string', 'float'], 'default':'null'},
                       {'name': 'House Number', 'type': ['string', 'float'], 'default':'null'},
                       {'name': 'Street Name', 'type': ['string', 'float'], 'default':'null'},
                       {'name': 'Intersecting Street', 'type': ['string', 'float'], 'default':'null'},
                       {'name': 'Date First Observed', 'type': 'float','default':-1},
                       {'name': 'Law Section', 'type': 'float'},
                       {'name': 'Sub Division', 'type': ['string', 'float'], 'default':'null'},
                       {'name': 'Violation Legal Code', 'type': ['string', 'float']},
                       {'name': 'Days Parking In Effect    ', 'type': ['string', 'float']},
                       {'name': 'From Hours In Effect', 'type': ['string', 'float']},
                       {'name': 'To Hours In Effect', 'type': ['string', 'float']},
                       {'name': 'Vehicle Color', 'type': ['string', 'float']},
                       {'name': 'Unregistered Vehicle?', 'type': ['string', 'float'], 'default':'null'},
                       {'name': 'Vehicle Year', 'type': 'float', 'default':-1},
                       {'name': 'Meter Number', 'type': ['string', 'float'], 'default':'null'},
                       {'name': 'Feet From Curb', 'type': 'float', 'default':-1},
                       {'name': 'Violation Post Code', 'type': ['string', 'float'], 'default':'null'},
                       {'name': 'Violation Description', 'type': ['string', 'float'], 'default':'null'},
                       {'name': 'No Standing or Stopping Violation', 'type': ['string', 'float'], 'default':'null'},
                       {'name': 'Hydrant Violation', 'type': ['string', 'float'], 'default':'null'},
                       {'name': 'Double Parking Violation', 'type': ['string', 'float'], 'default':'null'}
                   ]}

In [117]:
col_types_dask = {
"Summons Number" : "int",#"Int64",                     
"Plate ID" : object,                           
"Registration State" : object,                 
"Plate Type" : object,                         
"Issue Date" : object,                         
"Violation Code" : "int",                      
"Vehicle Body Type" : object,                  
"Vehicle Make" : object,                       
"Issuing Agency" : object,                     
"Street Code1" : "int",                        
"Street Code2" : "int",                        
"Street Code3" : "int",                                  
"Vehicle Expiration Date" : object,            
"Violation Location" : "Int64",                  
"Violation Precinct" : "int",                  
"Issuer Precinct" : "int",                     
"Issuer Code" : "int",                         
"Issuer Command" : object,                     
"Issuer Squad" : object,                       
"Violation Time" : object,                     
"Time First Observed" : object,                
"Violation County" : object,                   
"Violation In Front Of Or Opposite" : object,  
"Number" : object,                             
"House Number" : object,                       
"Street" : object,                             
"Street Name" : object,                        
"Intersecting Street" : object,                
"Date First Observed" : object,                
"Law Section" : object,                         
"Sub Division" : object,                       
"Violation Legal Code" : object,               
"Days Parking In Effect    " : object,         
"From Hours In Effect" : object,               
"To Hours In Effect" : object,                 
"Vehicle Color" : object,                      
"Unregistered Vehicle?" : object,               
"Vehicle Year" : "int",                        
"Meter Number" : object,                       
"Feet From Curb" : "int",                      
"Violation Post Code" : object,                
"Violation Description" : object,              
"No Standing or Stopping Violation" : object,  
"Hydrant Violation" : object,                  
"Double Parking Violation" : object}  

In [118]:
#raw_data = raw_data.drop('some_great_column', axis=1)

def drop_cols_dask(table, drops = to_drop):
    for d in drops:
        try:
            table = table.drop(d, axis = 1)
        except:
            continue
    return table

In [119]:
dfs = []
for f in files:
    ddf = dd.read_csv(PATH+'/'+f, dtype=col_types_dask)
    ddf = drop_cols_dask(ddf)
    dfs.append(ddf)
ddf = dd.concat(dfs)
ddf = ddf.dropna(subset=['Vehicle Year', 'Issuer Code'])

In [120]:
ddf.persist()

Unnamed: 0_level_0,Summons Number,Registration State,Plate Type,Issue Date,Violation Code,Vehicle Body Type,Vehicle Make,Issuing Agency,Street Code1,Street Code2,Street Code3,Vehicle Expiration Date,Issuer Code,Violation Time,Violation County,Intersecting Street,Law Section,Violation Legal Code,Days Parking In Effect,From Hours In Effect,To Hours In Effect,Vehicle Color,Vehicle Year,Violation Post Code,Days Parking In Effect
npartitions=302,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1,Unnamed: 22_level_1,Unnamed: 23_level_1,Unnamed: 24_level_1,Unnamed: 25_level_1
,int64,object,object,object,int64,object,object,object,int64,int64,int64,object,int64,object,object,object,object,object,object,object,object,object,int64,object,object
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


#### KEEP THIS CELL BELOW, AS IT SHOWS SOME COLUMNS HAVE VERY LITTLE DATA 
So it does make sense to drop them!

In [81]:
ddf.count().compute()

2022-06-01 16:13:35,993 - distributed.client - ERROR - Failed to reconnect to scheduler after 120.00 seconds, closing client


Summons Number                102960276
Registration State            102960276
Plate Type                    102960276
Issue Date                    102959972
Violation Code                102960276
Vehicle Body Type             102522730
Vehicle Make                  102441274
Issuing Agency                102960274
Street Code1                  102960276
Street Code2                  102960276
Street Code3                  102960276
Vehicle Expiration Date        93350244
Violation Location             79169011
Violation Precinct            102960275
Issuer Precinct               102960275
Issuer Code                   102960275
Issuer Command                 79395671
Issuer Squad                   79080226
Violation Time                102951319
Violation County               98468113
Number                          8243961
Intersecting Street            35373494
Law Section                   102960273
Violation Legal Code           23569281
Days Parking In Effect         48300433


In [68]:
bag = ddf.to_bag(index = False, format='dict').persist()

In [69]:
bag.to_avro('/d/hpc/projects/FRI/bigdata/students/mfmt/avro/avr_data_*.avro', avro_schema)

2022-06-02 12:30:53,713 - distributed.client - ERROR - Failed to reconnect to scheduler after 180.00 seconds, closing client


['/d/hpc/projects/FRI/bigdata/students/mfmt/avro/avr_data_000.avro',
 '/d/hpc/projects/FRI/bigdata/students/mfmt/avro/avr_data_001.avro',
 '/d/hpc/projects/FRI/bigdata/students/mfmt/avro/avr_data_002.avro',
 '/d/hpc/projects/FRI/bigdata/students/mfmt/avro/avr_data_003.avro',
 '/d/hpc/projects/FRI/bigdata/students/mfmt/avro/avr_data_004.avro',
 '/d/hpc/projects/FRI/bigdata/students/mfmt/avro/avr_data_005.avro',
 '/d/hpc/projects/FRI/bigdata/students/mfmt/avro/avr_data_006.avro',
 '/d/hpc/projects/FRI/bigdata/students/mfmt/avro/avr_data_007.avro',
 '/d/hpc/projects/FRI/bigdata/students/mfmt/avro/avr_data_008.avro',
 '/d/hpc/projects/FRI/bigdata/students/mfmt/avro/avr_data_009.avro',
 '/d/hpc/projects/FRI/bigdata/students/mfmt/avro/avr_data_010.avro',
 '/d/hpc/projects/FRI/bigdata/students/mfmt/avro/avr_data_011.avro',
 '/d/hpc/projects/FRI/bigdata/students/mfmt/avro/avr_data_012.avro',
 '/d/hpc/projects/FRI/bigdata/students/mfmt/avro/avr_data_013.avro',
 '/d/hpc/projects/FRI/bigdata/stud

In [74]:
tr = dask.bag.read_avro('/d/hpc/projects/FRI/bigdata/students/mfmt/avro/avr_data_301.avro')

In [75]:
tr.take(2)

({'Summons Number': 8819046714,
  'Plate ID': 'null',
  'Registration State': 'NY',
  'Plate Type': 'PAS',
  'Issue Date': '01/27/2022',
  'Violation Code': 20.0,
  'Vehicle Body Type': 'PICK',
  'Vehicle Make': 'RAM',
  'Issuing Agency': 'T',
  'Street Code1': 69730.0,
  'Street Code2': 67730.0,
  'Street Code3': 35130.0,
  'Vehicle Expiration Date': '20220924',
  'Violation Location': 'null',
  'Violation Precinct': 70.0,
  'Issuer Precinct': 70.0,
  'Issuer Code': 361784.0,
  'Issuer Command': 'null',
  'Issuer Squad': 'null',
  'Violation Time': '0630A',
  'Time First Observed': 'null',
  'Violation County': 'K',
  'Violation In Front Of Or Opposite': 'null',
  'House Number': 'null',
  'Street Name': 'null',
  'Intersecting Street': nan,
  'Date First Observed': -1.0,
  'Law Section': 408.0,
  'Sub Division': 'null',
  'Violation Legal Code': nan,
  'Days Parking In Effect    ': 'YYYYYYY',
  'From Hours In Effect': nan,
  'To Hours In Effect': nan,
  'Vehicle Color': 'BK',
  'Unre

2022-06-02 12:55:44,793 - distributed.client - ERROR - Failed to reconnect to scheduler after 180.00 seconds, closing client


#### HDF 5 FILE

In [124]:
ddf.to_hdf('/d/hpc/projects/FRI/bigdata/students/mfmt/hdf5/output.hdf', '/data') 

ValueError: Integer column has NA values in column 35