In [1]:
from yaml import load as yaml_load
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import VectorAssembler
import pyspark.sql.functions as func
spark = SparkSession.builder.master("local").appName("Data cleaning").getOrCreate()

In [2]:
sc = spark.sparkContext

In [3]:
import json

In [4]:
def _load_config_file(config_file):
    """
    Load configuration file
    :param config_file: is the configuration file
    :return: configuration
    :rtype: dict
    """
    with open(config_file) as yml_config:
        return yaml_load(yml_config)

def _build_configuration(config_file):
    """
    Build the operation configuration dict
    :param config_file: is the path to the yaml config_file
    :type: string
    :return: config: global configuration
    :rtype dict
    """
    # yaml config
    config = _load_config_file(config_file)
    return config

def normalize(df, columns):
    """
    
    """
    aggExpr = []
    for column in columns:
        aggExpr.append(mean(df[column]).alias(column))
    averages = df.agg(*aggExpr).collect()[0]
    selectExpr = []
    for column in columns:
        selectExpr.append(df[column] - averages[column])
    return df.select(selectExpr)

def get_dummies(df, list_columns):
    """
    
    """
    
    def join_all(dfs, keys):
        if len(dfs) > 1:
            return dfs[0].join(join_all(dfs[1:], keys), on = keys, how = 'inner')
        else:
            return dfs[0]
    dfs = []
    combined = []
    pivot_cols = list_columns
    keys = df.columns
    
    for pivot_col in pivot_cols:
        pivotDF =  df.groupBy(keys).pivot(pivot_col).count()
        new_names = pivotDF.columns[:len(keys)] +  ["{0}_{1}".format(pivot_col, c)\
                                                for c in pivotDF.columns[len(keys):]] 
        df = pivotDF.toDF(*new_names).fillna(0)
        combined.append(df)
    df_result =  join_all(combined, keys)
    
    return df_result

In [5]:
config_file = "/home/ml/Documents/crimes_chigaco/config/config.yml"
config = _build_configuration(config_file)

In [6]:
def path_crime():
    """
    :return:
    """
    return config['connect']['PathCrimes']

def path_socio():
    """
    :return:
    """
    return config['connect']['PathSocioEco']

def path_columns():
    """
    :return:
    """
    return config['connect']['Pathcolumns']

def path_temperature():
    """
    :return:
    """

    return config['connect']['PathTemperature']

def path_sky():
    """
    :return:
    """
    return config['connect']['PathSky']
def spark_shape(self):
    return (self.count(), len(self.columns))
pyspark.sql.dataframe.DataFrame.shape = spark_shape

In [7]:
column_name = json.loads(open(path_columns()).read())

In [8]:
def df_crime():
    """
    :return:
    """
    column_name = json.loads(open(path_columns()).read())
    df_crimes = spark.read.format("csv").option("header","true").\
    option("mode","DROPMALFORMED").option("delimiter", ";").load(path_crime())
    for old_name, new_name in column_name['DataCrimes'].items():
        df_crimes = df_crimes.withColumnRenamed(old_name, new_name)
    df_crimes = df_crimes.withColumn("date", func.to_timestamp("date", "MM/dd/yyyy hh:mm:ss aaa"))
    
    if config["List_of_crimes_prediction"]["with_merge"]:
        df_crimes = df_crimes.na.replace(config["List_of_crimes_prediction"]["to_merge"],'primary_type')
        return df_crimes

    else:
        return df_crimes

In [9]:
def df_socio():
    """

    :return:
    """
    column_name = json.loads(open(path_columns()).read())
    df_socio = spark.read.format("csv").option("header","true").\
    option("mode","DROPMALFORMED").option("delimiter", ",").load(path_socio())
    
    for old_name, new_name in column_name['SocioEco'].items():
        df_socio = df_socio.withColumnRenamed(old_name, new_name)
        
    column_names_to_normalize = ['pct_housing_crowded','pct_households_below_poverty',  'pct_age16_unemployed' , 'pct_age25_no_highschool', 'pct_not_working_age','per_capita_income',
                'hardship_index']

    return df_socio

In [27]:
def df_merged_spark():
    """

    :return:
    """
    column_name = json.loads(open(self.path_columns()).read())
    df_crime = pd.read_csv(self.path_crime(), sep=';')
    df_crime.rename(columns=column_name['DataCrimes'], inplace=True)
    df_socio = self.df_socio()
    df_merged = pd.merge(df_crime, df_socio, on='community_area_number', how = 'left')
   
    return df_merged


In [13]:
def df_nb_crimes_spark():
    
    """
    
    """
    df_S = df_socio()
    df_C = df_crime()
    df_C = df_C.filter(func.col('primary_type').isin(config['NameCrime']))
    name_crimes = df_crime_.select("primary_type").distinct().collect()
    list_name_crimes = [name_crimes[i][0] for i in range(len(name_crimes))]
    df_year = df_C.filter((func.col('date') > '2015-03-18') & (func.col('date') < '2017-01-15'))
    df_m  = df_year.withColumn("month", func.month(func.col("date"))).withColumn("year", func.year(func.col("date")))
    df_nb_crimes = df_m.groupBy('community_area_number', 'month', 'year', 'primary_type').agg(func.count(df_m.id).alias('nb_crimes'))
    df_merged = df_nb_crimes.join(df_S, ['community_area_number'], "inner")
    df_result = get_dummies(df_merged , list_columns=['primary_type', 'community_area_name', 'month'])
    del df_merged
    del df_nb_crimes
    del df_m
    
    return df_result

In [14]:
%%time
df_nb_crimes_spark_ = df_nb_crimes_spark()      



CPU times: user 75.4 ms, sys: 4.14 ms, total: 79.5 ms
Wall time: 2min 24s


In [16]:
def df_temperature_spark():
    
    """
    :return:
    """
    # pd.options.mode.chained_assignment = None
    df = spark.read.format("csv").option("header","true").\
    option("mode","DROPMALFORMED").option("delimiter", ",").load(path_temperature())
    df = df.select('Chicago', 'datetime').withColumnRenamed('Chicago','Temperature')
    df = df.filter((func.col('datetime') > '2017-05-10') & (func.col('datetime') < '2017-05-15'))
    df = df.withColumn("month", func.month(func.col("datetime"))).\
    withColumn("year", func.year(func.col("datetime"))).withColumn("day", func.dayofmonth(func.col("datetime"))).\
    withColumn("hour", func.hour(func.col("datetime")))
    return df

In [25]:
df_temp = df_temperature_spark()

In [18]:
def df_sky():
    """
    :return:
    """
    
    df = spark.read.format("csv").option("header","true").\
    option("mode","DROPMALFORMED").option("delimiter", ",").load(path_sky())
    df = df.select('Chicago', 'datetime')
    df= df.filter((func.col('datetime') > '2017-05-10') & (func.col('datetime') < '2017-05-15'))
    df = get_dummies(df, list_columns=['Chicago'])
    df= df.withColumn("month", func.month(func.col("datetime"))).\
    withColumn("year", func.year(func.col("datetime"))).withColumn("day", func.dayofmonth(func.col("datetime"))).\
    withColumn("hour", func.hour(func.col("datetime")))
  
    
    return df

In [19]:
df_sky_ = df_sky()

In [20]:
df_sky_.toPandas().sample(5)

Unnamed: 0,Chicago,datetime,Chicago_broken clouds,Chicago_few clouds,Chicago_fog,Chicago_haze,Chicago_heavy intensity rain,Chicago_light rain,Chicago_mist,Chicago_overcast clouds,Chicago_scattered clouds,Chicago_sky is clear,Chicago_thunderstorm,month,year,day,hour
68,overcast clouds,2017-05-10 09:00:00,0,0,0,0,0,0,0,1,0,0,0,5,2017,10,9
54,heavy intensity rain,2017-05-11 02:00:00,0,0,0,0,1,0,0,0,0,0,0,5,2017,11,2
108,overcast clouds,2017-05-10 06:00:00,0,0,0,0,0,0,0,1,0,0,0,5,2017,10,6
47,mist,2017-05-11 05:00:00,0,0,0,0,0,0,1,0,0,0,0,5,2017,11,5
94,sky is clear,2017-05-12 04:00:00,0,0,0,0,0,0,0,0,0,1,0,5,2017,12,4


In [26]:
df_temp.toPandas().sample(5)

Unnamed: 0,Temperature,datetime,month,year,day,hour
111,290.91,2017-05-14 15:00:00,5,2017,14,15
36,283.25,2017-05-11 12:00:00,5,2017,11,12
109,287.12,2017-05-14 13:00:00,5,2017,14,13
65,291.33,2017-05-12 17:00:00,5,2017,12,17
81,281.74,2017-05-13 09:00:00,5,2017,13,9


In [None]:
class LoadDataframe:

    """

    """

    def __init__(self, config):
        """

        :param config:
        """
        self._config = config
     

    def path_crime(self):
        """

        :return:
        """
        return self._config['connect']['PathCrimes']

    def path_socio(self):
        """

        :return:
        """
        return self._config['connect']['PathSocioEco']

    def path_columns(self):
        """

        :return:
        """
        return self._config['connect']['Pathcolumns']

    def path_temperature(self):
        """

        :return:
        """

        return self._config['connect']['PathTemperature']

    def path_sky(self):
        """

        :return:
        """
        return self._config['connect']['PathSky']

    def df_crime(self):
        """

        :return:
        """
        column_name = json.loads(open(path_columns()).read())
        df_crime = pd.read_csv(self.path_crime(), sep=';', parse_dates=['Date'])
        df_crime.rename(columns=column_name['DataCrimes'], inplace=True)

        if self._config["List_of_crimes_prediction"]["with_merge"]:
            df_crime.replace({'primary_type': self._config["List_of_crimes_prediction"]["to_merge"]}, inplace=True)
            return df_crime

        else:
            return df_crime

    def df_socio(self):
        """

        :return:
        """
        from sklearn.preprocessing import MinMaxScaler
        min_max_scaler = MinMaxScaler()
        column_name = json.loads(open(self.path_columns()).read())
        df_socio = pd.read_csv(self.path_socio())
        df_socio.rename(columns=column_name['SocioEco'], inplace=True)
        column_names_to_normalize = ['pct_housing_crowded','pct_households_below_poverty',  'pct_age16_unemployed' , 'pct_age25_no_highschool', 'pct_not_working_age','per_capita_income',
                'hardship_index']
        x = df_socio[column_names_to_normalize].values
        x_scaled = min_max_scaler.fit_transform(x)
        df_temp = pd.DataFrame(x_scaled, columns=column_names_to_normalize, index=df_socio.index)
        df_socio[column_names_to_normalize] = df_temp
        return df_socio

    def df_crime_socio(self):
        column_name = json.loads(open(self.path_columns()).read())
        df_crime = pd.read_csv(self.path_crime(), sep=';')
        df_crime.rename(columns=column_name['DataCrimes'], inplace=True)
        df_socio = self.df_socio()
        df_merged = pd.merge(df_crime, df_socio, on='community_area_number', how='left')
        return df_merged

    def df_temperature(self):
        """

        :return:
        """
        pd.options.mode.chained_assignment = None
        df = pd.read_csv(self.path_temperature(), parse_dates=['datetime'])
        df = df[['datetime', 'Chicago']]
        df.rename(columns={'Chicago': 'Temperature'}, inplace=True)
        df = df[(self._start_year <= df['datetime']) & (df['datetime'] < self._end_year)]
        df['month'] = df['datetime'].dt.month
        df['day'] = df['datetime'].dt.day
        df['hours'] = df['datetime'].dt.hour
        df.drop(columns='datetime', inplace=True, axis=1)
        return df

    def df_sky(self):
        """

        :return:
        """
        df = pd.read_csv(self.path_sky(), parse_dates=['datetime'])
        df = df[['Chicago', 'datetime']]
        df.dropna(inplace=True)
        df = pd.get_dummies(df, columns=['Chicago'])
        df = df[(self._start_year <= df['datetime']) & (df['datetime'] < self._end_year)]
        df['month'] = df['datetime'].dt.month
        df['day'] = df['datetime'].dt.day
        df['hours'] = df['datetime'].dt.hour
        df.drop(columns='datetime', inplace=True, axis=1)
        return df

    def df_merged(self):
        """

        :return:
        """
        column_name = json.loads(open(self.path_columns()).read())
        df_crime = pd.read_csv(self.path_crime(), sep=';')
        df_crime.rename(columns=column_name['DataCrimes'], inplace=True)
        df_socio = self.df_socio()
        df_merged = pd.merge(df_crime, df_socio, on='community_area_number', how='left')
        return df_merged

    def df_crime_socio(self):

        """

        :param year:
        :return:
        """

        df_crime = self.df_crime()
        df_socio = self.df_socio()
        df_crime = df_crime[(self._start_year <= df_crime['date']) & (df_crime['date'] < self._end_year)]
        df_src = pd.merge(df_crime, df_socio, on='community_area_number', how='left')
        del df_crime
        del df_socio
        return df_src

    def df_nb_crimes(self):

        """

        :param year:
        :return:
        """

        df_S = self.df_socio()
        df_C = self.df_crime()
        df_C = df_C[df_C.primary_type.isin(self._config["NameCrime"])]
        list_name_crimes = list(df_C['primary_type'].unique())
        df_year = df_C[(self._start_year <= df_C['date']) & (df_C['date'] < self._end_year)]
        df_year['month'] = pd.DatetimeIndex(df_year['date']).month
        df_year['year'] = pd.DatetimeIndex(df_year['date']).year
        df_year_grouped = df_year.groupby(['community_area_number', 'month', 'year', 'primary_type'],
                                          as_index=False).agg({'id': 'count'})
        df_year_grouped.rename(columns={'id': 'nb_crimes'}, inplace=True)
        df_merged = pd.merge(df_year_grouped, df_S, on='community_area_number', how='inner')
        del df_C
        del df_S
        df_merged.dropna(inplace=True)
        df_merged.drop(['year', 'community_area_number'], axis=1, inplace=True)
        df_merged_ = pd.get_dummies(df_merged, columns=['primary_type', 'community_area_name', 'month'])
        del df_year_grouped
        del df_merged
        for col in list_name_crimes:
            if "primary_type_" + col not in list(df_merged_.columns):
                df_merged_["primary_type_" + col] = -1
        return df_merged_