In [3]:
install.packages('sparklyr')
install.packages('dplyr')


The downloaded binary packages are in
	/var/folders/wx/19gqb3qs7039s9vsdwhxr9fr0000gn/T//RtmpdFPqql/downloaded_packages


In [43]:
filterData <- function(df){
  # Filter by year (2018-2021), region_name
  filtered <- df %>%
    filter(year(count_date) %in% c(2018:2021) & region_name == "London")
  return(filtered)
}
# ------------------------------------------------------------------------------

# ------------------------- TODO//: Add counts for both directions -------------

addCounts <- function(df){
    # TODO: Add all the other types of vehicle
    # Counts are measured in both directions, thus add them to get total traffic
    # that passes through area 
    
    # IF hour & count_date & local_authority_name are the same 
    # Add car_and_taxis etc... 

    # aggregate count_values_1, count_values_2, and count_values_3 by local_authority_name, hour, and count_date
    agg_df <- aggregate(cbind(cars_and_taxis, buses_and_coaches, lgvs) ~ local_authority_name + hour + count_date, data = df, sum)
    write.csv(agg_df, "addedData.csv", row.names = FALSE)
    
    # view the aggregated dataframe
    #print(agg_df)
    return(add_df)
}
# ------------------------------------------------------------------------------

# -------------------------- Remove Outliers -----------------------------------
remOutliers <- function(df){
  # Remove values that exceed 3*Standard deviation 
  threshold <-3
  # Convert type to numeric as currently NULL (for some reason...)
  df$all_motor_vehicles <- as.numeric(df$'all_motor_vehicles')
  # Calc mean
  mean_value <- mean(df$all_motor_vehicles)
  # Calc SD
  sd_value <- sd(df$all_motor_vehicles)
  # Create new df that excludes outliers 
  df_new <- subset(df, all_motor_vehicles > (mean_value - threshold * sd_value)
                   & all_motor_vehicles < (mean_value + threshold * sd_value))
  # Return new df 
  return(df_new)
}
# ------------------------------------------------------------------------------

# --------------------- Check/Remove missing Values ----------------------------
checkMissing <- function(df) {
  # Collect data frame in R to calculate missing values
  missing_local <- collect(df)
  # Count missing values in each column
  missing_counts <- sapply(missing_local, function(x) sum(is.na(x)))
  # Get the column names that have no missing values
  cols_to_keep <- names(missing_counts[missing_counts == 0])
  # Remove columns that contain missing values
  df <- df %>% select(cols_to_keep)
  return(df)
}
# --------------------- Change format of time ----------------------------------
formatTime <- function(df){
  # Replace 1,2,3 with 01:00:00,02:00:00,03:00:00 etc...
  df <- df %>%
    mutate(hour = ifelse(hour < 10, paste0("0", hour, ":00:00"), 
                         paste0(hour, ":00:00")))
  return(df)
}
# ------------------------------------------------------------------------------

main <- function(){
  # Connect to Apache Spark
  sc <- spark_connect(master = "local")
  # Read vehicle count csv file into Spark data frame 
  df <- spark_read_csv(sc, "vehicleCount.csv", infer_schema = TRUE)
  # Pipeline for transforming data
  df <- df %>%
    # Filter by loc and date
    filterData() %>%
    # Remove missing values
    checkMissing() %>%
    # Format the time
    formatTime() %>%
    # Remove outliers
    #remOutliers()
    addCounts()
  #print(df)  
  # Disconnect from Spark connection
  spark_disconnect(sc)
}

main()


Re-using existing Spark connection to local



In [36]:
spark_disconnect(sc)