In [1]:
import argparse
import os
import sys
import psutil
import numpy as np
import pandas as pd
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
#from datetime import datetime
import math
from IPython.display import display
from multiprocessing import cpu_count,Pool 
import multiprocessing
from joblib import Parallel, delayed
from orderedset import OrderedSet
import datetime

In [2]:
def print_memory_usage():
    print ("memory log:")
    process = psutil.Process(os.getpid())
    print("%5.2f GB (RSS)" % (process.memory_info().rss / 2**30))
    print("%5.2f GB (VMS)" % (process.memory_info().vms / 2**30))
    print("%5.2f GB (Used)" % (psutil.virtual_memory().used / 2**30))
    print("%5.2f GB (Available)" % (psutil.virtual_memory().available / 2**30))
    print("%5.2f GB (Total)" % (psutil.virtual_memory().total / 2**30))


def days_between(data):
    #print (multiprocessing.current_process())
    data['duration'] = abs((data.EndTime - data.StartTime).astype('timedelta64[s]')/60)
    return data

def type_modif(data):
    #created = multiprocessing.Process()
    #print ('created:', created.name, created._identity)
    #print ("Process created")
    data['RefinedType'] = data.RefinedType_.apply(lambda x: x.split('-')[0])
    return data

def State_set(data,map):
    #mask = (data.Type == 'W')
    #z_valid = data[mask]
    data['City']=data.apply(lambda row: map[row['AirportCode']]['City'] if (row['Type']=='W' and row['AirportCode'] in map.keys()) else row['City'],axis=1)
    data['State']=data.apply(lambda row: map[row['AirportCode']]['State'] if (row['Type']=='W' and row['AirportCode'] in map.keys()) else row['State'],axis=1)
    return data

class WithExtraArgs(object):
    def __init__(self, func, **args):
        self.func = func
        self.args = args
    def __call__(self, df):
        return self.func(df, **self.args)

def applyParallel(pool,data, func, kwargs):
    data_split = np.array_split(data, min(partitions,data.shape[0]))
    data = pd.concat(pool.map(WithExtraArgs(func, **kwargs), data_split))
    return data

def parallelize(data, func,pool):
    data_split = np.array_split(data, partitions)
    data = pd.concat(pool.map(func, data_split))
    return data


In [3]:
filepath = '../../AllEvents_Aug2016-Aug2018.csv'
raduis = 14 # mi

cores = cpu_count() #Number of CPU cores on your system
partitions = cores #Define as many partitions as you want
print ("number of cores "+str(partitions))

#load data
use_cols=['EventId','Type',	'RefinedType_',	'StartTime'	
    ,'EndTime',	'LocationLat',	'LocationLng',	'Distance(mi)'	
    ,'AirportCode',	'Number',	'Street',	'Side',	'City',	'County'	
    ,'State',	'ZipCode']

data_set = pd.read_csv(filepath,low_memory=True)
data_set.columns = use_cols

number of cores 28


  interactivity=interactivity, compiler=compiler, result=result)


In [4]:
data_set.StartTime = pd.to_datetime(data_set.StartTime, format="%Y-%m-%d %H:%M:%S", errors='coerce')
data_set.EndTime = pd.to_datetime(data_set.EndTime, format="%Y-%m-%d %H:%M:%S", errors='coerce')
data_set = data_set.drop(data_set[data_set.StartTime.isnull()].index)
data_set = data_set.drop(data_set[data_set.EndTime.isnull()].index)

print ("Data set is loaded")
print_memory_usage()

Data set is loaded
memory log:
 5.68 GB (RSS)
 6.43 GB (VMS)
 8.95 GB (Used)
116.13 GB (Available)
125.81 GB (Total)


In [5]:
airport_mappath="../../AirportToCityState.csv"
airport_df = pd.read_csv(airport_mappath,low_memory=True)
airport_df['State'] = airport_df.CityState.apply(lambda x: x.split('-')[1])
airport_df['City'] = airport_df.CityState.apply(lambda x: x.split('-')[0])
airport_df = airport_df.set_index('Airport')
airport_df = airport_df[['State','City']]
airport_map = airport_df.to_dict('index')

In [6]:
print ("set state-city for weather events")
pool = Pool(cores)
data_set = applyParallel (pool, data_set, State_set,{'map':airport_map.copy()})
pool.close()
pool.join()
print ("setting state-city for weather events done")
print ("*"*80)

set state-city for weather events
setting state-city for weather events done
********************************************************************************


In [7]:
invalid_entity = data_set[data_set.City.isnull() | data_set.State.isnull()]
display(invalid_entity.shape)
data_set = data_set.drop(invalid_entity.index)
invalid_entity = data_set[data_set.City.isnull() | data_set.State.isnull()]
display(invalid_entity.shape)
#data_set = data_set.reset_index()

(2128, 16)

(0, 16)

In [8]:
print ("modify start end time type")
pool = Pool(cores)
data_set = parallelize (data_set,type_modif,pool)
pool.close()
pool.join()
print ("type modification for start_end time is done")
print ("*"*80)

modify start end time type
type modification for start_end time is done
********************************************************************************


In [9]:
print ("compute the duration of events")
pool = Pool(cores)
data_set = parallelize (data_set,days_between,pool)#traffic_events.apply(days_between,axis=1)
pool.close()
pool.join()
#display(data_set)
print_memory_usage()
print ("duration computation is done")

compute the duration of events
memory log:
 6.77 GB (RSS)
 7.64 GB (VMS)
10.10 GB (Used)
114.98 GB (Available)
125.81 GB (Total)
duration computation is done


In [10]:
print ("summary of duration:")
summary = data_set.duration.describe()
display(summary)
print ("*"*80)


data_histo = data_set[np.abs(data_set.duration-data_set.duration.mean())<=(10*data_set.duration.std())] 
data_histo.duration/=60
fig, ax = plt.subplots()
data_histo.duration.hist(ax=ax, bins=100, bottom=0.1)
ax.set_yscale('log')
plt.xlabel('event duration in hour')
plt.ylabel('log scale events count')
plt.title('Event Duration Distribution')
plt.savefig('all_event_du_dist.png')

summary of duration:


count    1.532509e+07
mean     2.764785e+02
std      1.051573e+04
min      0.000000e+00
25%      2.978333e+01
50%      4.281667e+01
75%      5.010000e+01
max      1.209753e+07
Name: duration, dtype: float64

********************************************************************************


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  self[name] = value


In [11]:
print ("events type:")
d = data_set.groupby('RefinedType').size().to_frame(name='name')
display(d)

#save data_set 
data_set.to_hdf('../../data_set.h5',key='DS_new')
print ("dataset is saved")

events type:


Unnamed: 0_level_0,name
RefinedType,Unnamed: 1_level_1
Accident,1169481
Broken,308110
Congestion,10541613
Construction,209923
Event,32817
Incident,637478
Lane,246828
Other,1561
cold,67274
fog,454282


your performance may suffer as PyTables will pickle object types that it cannot
map directly to c-types [inferred_type->mixed,key->block2_values] [items->['EventId', 'Type', 'RefinedType_', 'AirportCode', 'Street', 'Side', 'City', 'County', 'State', 'RefinedType']]

  return pytables.to_hdf(path_or_buf, key, self, **kwargs)


dataset is saved
